smallrye-reactive-messaging
smallrye-reactive-messaging copied to clipboard
Message sending never succeeds to Azure Service Bus
I made an issue previously regarding claims-based security #882. As indicated in that issue, the underlying connection issue does not seem to be due to the authentication mechanism, since I can now connect to the Azure service bus using the Vertx AMQP client (it does seem to be necessary to set the SASL mechanism to PLAIN
in order to connect, otherwise the connection works but any attempt to send messages fails due to a lack of rights).
However, I cannot send messages using an injected Smallrye channel, even though I can send them directly using the Vertx AMQP client using the same configuration.
To illustrate this, I set up a minimal Quarkus application, including only the Smallrye AMQP client:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>
In my configuration I include:
mp.messaging.outgoing.my-channel.connector=smallrye-amqp
mp.messaging.outgoing.my-channel.address=my-queue-name
mp.messaging.outgoing.my-channel.link-name=my-queue-name
mp.messaging.outgoing.my-channel.container-id=my-queue-name
mp.messaging.outgoing.my-channel.durable=true
mp.messaging.outgoing.my-channel.dynamic=false
mp.messaging.outgoing.my-channel.client-options-name=my-queue-options
quarkus.log.level=FINE
I produce the Vertx AMQP connection options (in order to force the PLAIN
SASL mechanism), in the first instance to connect to a local Artemis instance, via:
package org.acme;
import io.vertx.amqp.AmqpClientOptions;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Named;
@ApplicationScoped
public class ClientOptions {
public static String queueName = "my-queue-name";
@Produces
@Named("my-queue-options")
public AmqpClientOptions getNamedOptions() {
return new AmqpClientOptions()
.addEnabledSaslMechanism("PLAIN")
.setUsername("quarkus")
.setPassword("quarkus")
.setHost("localhost")
.setPort(5672)
.setContainerId(ClientOptions.queueName);
}
}
and I have a resource class which allows me to attempt message sending either via the injected Smallrye channel or directly via the Vertx AMQP client:
package org.acme;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpConnection;
import io.vertx.mutiny.amqp.AmqpMessageBuilder;
import io.vertx.mutiny.amqp.AmqpSender;
import org.eclipse.microprofile.reactive.messaging.Channel;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/messages/")
public class GreetingResource {
@Inject
@Channel("employees")
MutinyEmitter<String> emitter;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("smallrye")
public String hello() {
emitter.send("Hello from Smallrye").subscribe().with((event) -> {
System.out.println("Smallrye message sending succeeded");
}, (error) -> {
System.out.println("Smallrye message sending failed");
});
return "Hello Smallrye";
}
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("vertx")
public String hello1() {
AmqpClient.create((new ClientOptions()).getNamedOptions())
.connect()
.chain((AmqpConnection connection) -> {
return connection.createSender(ClientOptions.queueName, new AmqpSenderOptions()
.setDynamic(false)
.setLinkName(ClientOptions.queueName));
})
.chain((AmqpSender sender) -> {
return sender.sendWithAck(AmqpMessageBuilder.create()
.withBody("Hello from Vertx")
.build());
})
.subscribe()
.with((event) -> {
System.out.println("Vertx message sending succeeded");
}, (error) -> {
System.out.println("Vertx message sending failed");
});
return "Hello Vertx";
}
}
This gives the following logs on startup:
2021-02-09 14:36:01,138 DEBUG [io.qua.ver.cor.run.VertxCoreRecorder] (Quarkus Main Thread) Vertx has Native Transport Enabled: false
2021-02-09 14:36:01,156 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00226: Found incoming connectors: [smallrye-amqp]
2021-02-09 14:36:01,156 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00227: Found outgoing connectors: [smallrye-amqp]
2021-02-09 14:36:01,158 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00229: Channel manager initializing...
2021-02-09 14:36:01,166 DEBUG [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16200: Creating AMQP client from bean named 'my-channel'
2021-02-09 14:36:01,187 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00209: Beginning graph resolution, number of components detected: 2
2021-02-09 14:36:01,188 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00210: Graph resolution completed in 1260033 ns
2021-02-09 14:36:01,188 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00235: Beginning materialization
2021-02-09 14:36:01,232 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2021-02-09 14:36:01,237 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00236: Materialization completed in 48399939 ns
2021-02-09 14:36:01,245 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.numHeapArenas: 24
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.numDirectArenas: 24
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.pageSize: 8192
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.maxOrder: 1
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.chunkSize: 16384
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.tinyCacheSize: 512
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.smallCacheSize: 256
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.normalCacheSize: 64
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.maxCachedBufferCapacity: 32768
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.cacheTrimInterval: 8192
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.cacheTrimIntervalMillis: 0
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.useCacheForAllThreads: true
2021-02-09 14:36:01,246 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-0) -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
2021-02-09 14:36:01,288 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-0) -Dio.netty.allocator.type: pooled
2021-02-09 14:36:01,288 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-0) -Dio.netty.threadLocalDirectBufferSize: 0
2021-02-09 14:36:01,288 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-0) -Dio.netty.maxThreadLocalCharBufferSize: 16384
2021-02-09 14:36:01,289 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-9) -Djava.net.preferIPv4Stack: false
2021-02-09 14:36:01,289 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-9) -Djava.net.preferIPv6Addresses: false
2021-02-09 14:36:01,291 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-9) Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
2021-02-09 14:36:01,291 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-9) Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
2021-02-09 14:36:01,373 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.maxCapacityPerThread: 4096
2021-02-09 14:36:01,373 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.maxSharedCapacityFactor: 2
2021-02-09 14:36:01,373 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.linkCapacity: 16
2021-02-09 14:36:01,373 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.ratio: 8
2021-02-09 14:36:01,378 DEBUG [io.net.buf.AbstractByteBuf] (vert.x-eventloop-thread-0) -Dio.netty.buffer.checkAccessible: true
2021-02-09 14:36:01,378 DEBUG [io.net.buf.AbstractByteBuf] (vert.x-eventloop-thread-0) -Dio.netty.buffer.checkBounds: true
2021-02-09 14:36:01,379 DEBUG [io.net.uti.ResourceLeakDetectorFactory] (vert.x-eventloop-thread-0) Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@25148e35
2021-02-09 14:36:01,387 INFO [io.quarkus] (Quarkus Main Thread) code-with-quarkus 1.0.0-SNAPSHOT on JVM (powered by Quarkus 1.11.1.Final) started in 1.999s. Listening on: http://localhost:8080
2021-02-09 14:36:01,388 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2021-02-09 14:36:01,388 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, mutiny, resteasy, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-amqp, vertx]
2021-02-09 14:36:01,982 FINE [org.apa.qpi.pro.eng.imp.SaslImpl] (vert.x-eventloop-thread-0) Handled outcome: SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT]
2021-02-09 14:36:02,002 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Open{ containerId='10a6c076bbbd', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.15.0}}
2021-02-09 14:36:02,004 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2021-02-09 14:36:02,130 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 14:36:02,130 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Attach{name='auto-0', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 14:36:02,131 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Flow{nextIncomingId=1, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
2021-02-09 14:36:02,133 DEBUG [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16222: Retrieved credits for channel `my-channel`: 1000
2021-02-09 14:36:16,410 DEBUG [io.net.han.cod.com.ZlibCodecFactory] (vert.x-eventloop-thread-9) -Dio.netty.noJdkZlibDecoder: false
2021-02-09 14:36:16,410 DEBUG [io.net.han.cod.com.ZlibCodecFactory] (vert.x-eventloop-thread-9) -Dio.netty.noJdkZlibEncoder: false
Calling the Smallrye message end point results in the following logs:
2021-02-09 14:36:16,458 DEBUG [io.sma.rea.mes.amqp] (executor-thread-1) SRMSG16211: Sending AMQP message to address `my-queue-name`
2021-02-09 14:36:16,468 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:16,469 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:16,469 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:16,472 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:16,472 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) Interceptor Context: org.jboss.resteasy.core.interception.jaxrs.ServerWriterInterceptorContext, Method : proceed
2021-02-09 14:36:16,472 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:16,472 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:16,556 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false}
Smallrye message sending succeeded
and calling the Vertx message end point gives:
2021-02-09 14:36:24,904 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) RESTEASY002315: PathInfo: /messages/vertx
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) Interceptor Context: org.jboss.resteasy.core.interception.jaxrs.ServerWriterInterceptorContext, Method : proceed
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 14:36:24,909 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 14:36:24,942 FINE [org.apa.qpi.pro.eng.imp.SaslImpl] (vert.x-eventloop-thread-0) Handled outcome: SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT]
2021-02-09 14:36:24,946 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Open{ containerId='10a6c076bbbd', hostname='null', maxFrameSize=131072, channelMax=65535, idleTimeOut=30000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=[sole-connection-for-container, DELAYED_DELIVERY, SHARED-SUBS, ANONYMOUS-RELAY], desiredCapabilities=null, properties={product=apache-activemq-artemis, version=2.15.0}}
2021-02-09 14:36:24,979 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 14:36:24,979 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Attach{name='my-queue-name', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='my-queue-name', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 14:36:24,979 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Flow{nextIncomingId=1, incomingWindow=2147483647, nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=1000, available=null, drain=false, echo=false, properties=null}
2021-02-09 14:36:24,989 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Accepted{}, batchable=false}
Vertx message sending succeeded
indicating that message sending using both the injected client and the direct Vertx AMQP client was successful (and I can see that two messages are present in the queue on the Artemis console). It is interesting to note that Smallrye seems to detect an incoming channel on startup, although no incoming channel is configured.
However, when I update my AMQP client configuration in order to connect to the Azure Servie Bus:
return new AmqpClientOptions()
.addEnabledSaslMechanism("PLAIN")
.setSsl(true)
.setUsername("ASB-SAS-Key-Name")
.setPassword("ASB-SAS-Key")
.setHost("my-service-bus.servicebus.windows.net")
.setPort(5671)
.setContainerId(ClientOptions.queueName);
The startup logs give:
2021-02-09 15:04:48,998 DEBUG [io.qua.ver.cor.run.VertxCoreRecorder] (Quarkus Main Thread) Vertx has Native Transport Enabled: false
2021-02-09 15:04:49,016 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00226: Found incoming connectors: [smallrye-amqp]
2021-02-09 15:04:49,016 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00227: Found outgoing connectors: [smallrye-amqp]
2021-02-09 15:04:49,018 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00229: Channel manager initializing...
2021-02-09 15:04:49,025 DEBUG [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16200: Creating AMQP client from bean named 'my-channel'
2021-02-09 15:04:49,048 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00209: Beginning graph resolution, number of components detected: 2
2021-02-09 15:04:49,049 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00210: Graph resolution completed in 1489754 ns
2021-02-09 15:04:49,050 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00235: Beginning materialization
2021-02-09 15:04:49,100 INFO [io.sma.rea.mes.amqp] (Quarkus Main Thread) SRMSG16212: Establishing connection with AMQP broker
2021-02-09 15:04:49,105 DEBUG [io.sma.rea.mes.provider] (Quarkus Main Thread) SRMSG00236: Materialization completed in 55342931 ns
2021-02-09 15:04:49,130 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.numHeapArenas: 24
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.numDirectArenas: 24
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.pageSize: 8192
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.maxOrder: 1
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.chunkSize: 16384
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.tinyCacheSize: 512
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.smallCacheSize: 256
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.normalCacheSize: 64
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.maxCachedBufferCapacity: 32768
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.cacheTrimInterval: 8192
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.cacheTrimIntervalMillis: 0
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.useCacheForAllThreads: true
2021-02-09 15:04:49,131 DEBUG [io.net.buf.PooledByteBufAllocator] (vert.x-eventloop-thread-16) -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
2021-02-09 15:04:49,180 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-16) -Dio.netty.allocator.type: pooled
2021-02-09 15:04:49,180 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-16) -Dio.netty.threadLocalDirectBufferSize: 0
2021-02-09 15:04:49,180 DEBUG [io.net.buf.ByteBufUtil] (vert.x-eventloop-thread-16) -Dio.netty.maxThreadLocalCharBufferSize: 16384
2021-02-09 15:04:49,181 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-16) -Djava.net.preferIPv4Stack: false
2021-02-09 15:04:49,181 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-16) -Djava.net.preferIPv6Addresses: false
2021-02-09 15:04:49,182 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-16) Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
2021-02-09 15:04:49,183 DEBUG [io.net.uti.NetUtil] (vert.x-eventloop-thread-16) Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
2021-02-09 15:04:49,262 DEBUG [io.net.han.ssl.JdkSslContext] (vert.x-eventloop-thread-0) Default protocols (JDK): [TLSv1.2, TLSv1.1, TLSv1]
2021-02-09 15:04:49,262 DEBUG [io.net.han.ssl.JdkSslContext] (vert.x-eventloop-thread-0) Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
2021-02-09 15:04:49,428 INFO [io.quarkus] (Quarkus Main Thread) code-with-quarkus 1.0.0-SNAPSHOT on JVM (powered by Quarkus 1.11.1.Final) started in 2.093s. Listening on: http://localhost:8080
2021-02-09 15:04:49,428 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2021-02-09 15:04:49,428 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, mutiny, resteasy, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-amqp, vertx]
2021-02-09 15:04:49,474 DEBUG [io.net.buf.AbstractByteBuf] (vert.x-eventloop-thread-0) -Dio.netty.buffer.checkAccessible: true
2021-02-09 15:04:49,474 DEBUG [io.net.buf.AbstractByteBuf] (vert.x-eventloop-thread-0) -Dio.netty.buffer.checkBounds: true
2021-02-09 15:04:49,474 DEBUG [io.net.uti.ResourceLeakDetectorFactory] (vert.x-eventloop-thread-0) Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@4be2000c
2021-02-09 15:04:49,478 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.maxCapacityPerThread: 4096
2021-02-09 15:04:49,478 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.maxSharedCapacityFactor: 2
2021-02-09 15:04:49,479 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.linkCapacity: 16
2021-02-09 15:04:49,479 DEBUG [io.net.uti.Recycler] (vert.x-eventloop-thread-0) -Dio.netty.recycler.ratio: 8
2021-02-09 15:04:49,692 DEBUG [io.net.han.ssl.SslHandler] (vert.x-eventloop-thread-0) [id: 0x3ad44338, L:/10.5.0.2:51669 - R:my-service-bus.servicebus.windows.net/65.52.128.246:5671] HANDSHAKEN: protocol:TLSv1.2 cipher suite:TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
2021-02-09 15:04:49,800 FINE [org.apa.qpi.pro.eng.imp.SaslImpl] (vert.x-eventloop-thread-0) Handled outcome: SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT]
2021-02-09 15:04:49,821 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Open{ containerId='164da4dd821145dd9d3cf52f03bc8653_G11', hostname='null', maxFrameSize=32768, channelMax=4999, idleTimeOut=240000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:04:49,823 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
2021-02-09 15:04:49,851 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=65535, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:04:50,083 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Attach{name='auto-0', handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:04:50,087 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:164da4dd821145dd9d3cf52f03bc8653_G11, SystemTracker:gateway7, Timestamp:2021-02-09T14:04:49', info=null}}
We can see that there is a Detach
event with an error, immediately on startup (perhaps to do with the incoming channel that Smallrye seems to have found) but presumably this should not have any thing to do with message sending since that will be a separate on-demand connection.
A minute later (presumably some kind of timeout, probably on Azure Service Bus side for inactivity in an outgoing channel), we get:
2021-02-09 15:05:49,879 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : End{error=null}
2021-02-09 15:05:49,880 ERROR [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16214: AMQP Connection failure: java.lang.Exception: Connection disconnected
at io.vertx.amqp.impl.AmqpConnectionImpl.onDisconnect(AmqpConnectionImpl.java:145)
at io.vertx.amqp.impl.AmqpConnectionImpl.lambda$null$3(AmqpConnectionImpl.java:101)
at io.vertx.proton.impl.ProtonConnectionImpl.lambda$getDefaultSession$6(ProtonConnectionImpl.java:263)
at io.vertx.proton.impl.ProtonSessionImpl.fireRemoteClose(ProtonSessionImpl.java:276)
at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:131)
at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:379)
at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:357)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:163)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
When running the same end points as before (this time with the Vertx end point first), we get the following logs:
2021-02-09 15:12:04,906 DEBUG [io.net.han.cod.com.ZlibCodecFactory] (vert.x-eventloop-thread-16) -Dio.netty.noJdkZlibDecoder: false
2021-02-09 15:12:04,906 DEBUG [io.net.han.cod.com.ZlibCodecFactory] (vert.x-eventloop-thread-16) -Dio.netty.noJdkZlibEncoder: false
2021-02-09 15:12:04,946 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) RESTEASY002315: PathInfo: /messages/vertx
2021-02-09 15:12:04,966 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:12:04,966 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 15:12:04,966 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 15:12:04,968 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:12:04,968 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) Interceptor Context: org.jboss.resteasy.core.interception.jaxrs.ServerWriterInterceptorContext, Method : proceed
2021-02-09 15:12:04,969 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:12:04,969 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 15:12:05,197 DEBUG [io.net.han.ssl.SslHandler] (vert.x-eventloop-thread-0) [id: 0x912a5065, L:/10.5.0.2:53616 - R:sb-weu-dev-mypad-001.servicebus.windows.net/65.52.128.246:5671] HANDSHAKEN: protocol:TLSv1.2 cipher suite:TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
2021-02-09 15:12:05,241 FINE [org.apa.qpi.pro.eng.imp.SaslImpl] (vert.x-eventloop-thread-0) Handled outcome: SaslImpl [_outcome=PN_SASL_OK, state=PN_SASL_PASS, done=true, role=CLIENT]
2021-02-09 15:12:05,259 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Open{ containerId='bb04345351ef474698415c453b09377a_G12', hostname='null', maxFrameSize=32768, channelMax=4999, idleTimeOut=240000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:12:05,277 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=65535, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:12:05,281 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Attach{name='my-queue-name', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='my-queue-name', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=262144, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 15:12:05,289 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1, outgoingWindow=65535, handle=0, deliveryCount=0, linkCredit=1000, available=0, drain=false, echo=false, properties=null}
2021-02-09 15:12:05,324 FINE [pro.trace] (vert.x-eventloop-thread-0) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=null, settled=true, state=Accepted{}, batchable=false}
Vertx message sending succeeded
and:
2021-02-09 15:13:18,915 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) RESTEASY002315: PathInfo: /messages/smallrye
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) Interceptor Context: org.jboss.resteasy.core.interception.jaxrs.ServerWriterInterceptorContext, Method : proceed
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.core.providerfactory.SortedKey
2021-02-09 15:13:18,919 DEBUG [org.jbo.res.res.i18n] (executor-thread-1) MessageBodyWriter: org.jboss.resteasy.plugins.providers.StringTextStar
So in this scenario sending the message via the Smallrye channel never completes (either by sending or by erroring), and we do not see any AMQP-relevant logs which lead me to believe that sending is not even attempted.
Since Smallrye Messaging should be able to do anything the underlying Vertx AMQP client can do, it seems to me that this problem must come from some difference in configuration which Smallrye is using, but I cannot figure out what this might be. Is the fact that it is finding an incoming channel (even though none is configured) causing the problem? Any help as to how to configure my channels so that they function correctly both with my local Artemis instance and with the Azure Service Bus would be greatly appreciated.
@gemmellr any idea?
I don't have access to Azure Service Bus, anything I can run locally?
Just saw this. Some comments/thoughts from quick skim before I head off.
I dont have access either, and I'm not aware of anything you can try locally. There was a local Service Bus version originally, but they killed it some years ago (it wasnt the same anyway AFAIK).
The logs arent that helpful on their own. If you set the PN_TRACE_FRM environment variable (not system property) to 1 or true then proton-j will emit protocol trace detail to stdout. You can then more easily see everything that happens protocol wise, and ways in which different cases differ.
The vertx-amqp-client example code uses a producer to a specific fixed destination, while the reactive-messaging based application is probably defaulting to trying an anonymous-sender link used to send messages to any destination. Yep, the small amount of detail tehre confirms that. You can disable that with the use-anonymous-sender config to see if it helps.
I know ServceBus doesnt announce the connection-capability for indicating support of the anonymous-relay node, but I dont know for sure whether it supports the mechanism (their other bits certainly do something similar). I was under the impression it did, but its not something I've tried, so maybe not. Either way it is a clear difference between the two cases.
Setting dynamic false is superfluous.
Ah, missed the SASL point. Based on the description of needing to set PLAIN to be used, I expect that means ServiceBus is announcing the ANONYMOUS mech (which they use for CBS) before the PLAIN mech, so you needed to tell the client to use PLAIN since it would otherwise spot the ANONYMOUS first and use that.
Thanks for the comments.
Setting PN_TRACE_FRM
gives the following logs for startup:
[458328925:0] -> SASL
[458328925:0] <- SASL
[458328925:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, ANONYMOUS, EXTERNAL]}
[458328925:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00ASB-SAS-Key-Name\x00ASB-SAS-Key, hostname='null'}
[458328925:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[458328925:0] -> AMQP
[458328925:0] -> Open{ containerId='my-queue-name', hostname='my-service-bus.servicebus.windows.net', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.5}}
[458328925:0] <- AMQP
[458328925:0] <- Open{ containerId='840f4a5f7c20441e82256fc7d0d1ef77_G9', hostname='null', maxFrameSize=32768, channelMax=4999, idleTimeOut=240000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 20:21:19,616 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
[458328925:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[458328925:0] -> Attach{name='auto-0', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[458328925:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=65535, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[458328925:0] <- Attach{name='auto-0', handle=0, role=RECEIVER, sndSettleMode=MIXED, rcvSettleMode=FIRST, source=null, target=null, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[458328925:0] <- Detach{handle=0, closed=true, error=Error{condition=amqp:internal-error, description='The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:840f4a5f7c20441e82256fc7d0d1ef77_G9, SystemTracker:gateway7, Timestamp:2021-02-09T19:21:19', info=null}}
[458328925:0] -> Detach{handle=0, closed=true, error=null}
[458328925:0] <- End{error=null}
followed shortly later by [458328925:0] -> Close{error=null}
.
Sending via the Vertx AMQP client then gives:
[1201387136:0] -> SASL
[1201387136:0] <- SASL
[1201387136:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, ANONYMOUS, EXTERNAL]}
[1201387136:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00ASB-SAS-Key-Name\xASB-SAS-Key, hostname='null'}
[1201387136:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[1201387136:0] -> AMQP
[1201387136:0] -> Open{ containerId='my-queue-name', hostname='my-service-bus.servicebus.windows.net', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.5}}
[1201387136:0] <- AMQP
[1201387136:0] <- Open{ containerId='bb04345351ef474698415c453b09377a_G12', hostname='null', maxFrameSize=32768, channelMax=4999, idleTimeOut=240000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1201387136:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1201387136:0] -> Attach{name='my-queue-name', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='my-queue-name', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1201387136:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=65535, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1201387136:0] <- Attach{name='sbt-employee-updates', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='my-queue-name', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=262144, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1201387136:0] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1, outgoingWindow=65535, handle=0, deliveryCount=0, linkCredit=1000, available=0, drain=false, echo=false, properties=null}
[1201387136:0] -> Transfer{handle=0, deliveryId=0, deliveryTag=\x00\x00\x00\x01, messageFormat=0, settled=false, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false} (57) "\x00Ss\xd0\x00\x00\x00\x1c\x00\x00\x00\x03@@\xa1\x14my-queue-name\x00Sw\xa1\x10Hello from Vertx"
[1201387136:0] <- Disposition{role=RECEIVER, first=0, last=null, settled=true, state=Accepted{}, batchable=false}
Vertx message sending succeeded
and sending via Smallrye gives only:
[1201387136:0] <- Flow{nextIncomingId=2, incomingWindow=5000, nextOutgoingId=1, outgoingWindow=65535, handle=null, deliveryCount=null, linkCredit=null, available=null, drain=false, echo=true, properties=null}
after a while, followed by:
[1201387136:0] -> Empty Frame
on a second attempt (after a small wait), and the same after a third attempt. N.B. I disabled FINE
logs.
Disabling anonymous senders by adding:
mp.messaging.outgoing.my-channel.use-anonymous-sender=false
to the configuration gives the following startup:
[1340753317:0] -> SASL
[1340753317:0] <- SASL
[1340753317:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, ANONYMOUS, EXTERNAL]}
[1340753317:0] -> SaslInit{mechanism=PLAIN, initialResponse=\x00ASB-SAS-Key-Name\x00ASB-SAS-Key, hostname='null'}
[1340753317:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[1340753317:0] -> AMQP
[1340753317:0] -> Open{ containerId='my-queue-name', hostname='my-service-bus.servicebus.windows.net', maxFrameSize=32768, channelMax=65535, idleTimeOut=null, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties={product=vertx-amqp-client, version=3.9.5}}
[1340753317:0] <- AMQP
[1340753317:0] <- Open{ containerId='5274cd8caf3a4dc389c36f31218c727c_G4', hostname='null', maxFrameSize=32768, channelMax=4999, idleTimeOut=240000, outgoingLocales=null, incomingLocales=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
2021-02-09 20:36:09,988 INFO [io.sma.rea.mes.amqp] (vert.x-eventloop-thread-0) SRMSG16213: Connection with AMQP broker established
[1340753317:0] -> Begin{remoteChannel=null, nextOutgoingId=1, incomingWindow=65535, outgoingWindow=2147483647, handleMax=65535, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1340753317:0] -> Attach{name='sbt-employee-updates', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='sbt-employee-updates', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1340753317:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000, outgoingWindow=65535, handleMax=255, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1340753317:0] <- Attach{name='sbt-employee-updates', handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='sbt-employee-updates', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null, maxMessageSize=262144, offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1340753317:0] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1, outgoingWindow=65535, handle=0, deliveryCount=0, linkCredit=1000, available=0, drain=false, echo=false, properties=null}
and attempting to send using the Smallrye client gives no result, whereas using the Vertx client works as before.
It seems to me that the connection made on startup by the Smallrye client must be blocking somewhere, so that attempts to send a message by it do not generate logs simply because no actual sending is attempted. Why does Smallrye connect on startup (detecting an incoming connector) even when there are no incoming connectors? To report health status perhaps?
I appreciate that you do not have access to a service bus connection — anything you think is worth trying I am happy to try and report back to you.
If I am interpreting
[1201387136:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN, ANONYMOUS, EXTERNAL]}
correctly, the service bus is announcing the PLAIN
SASL mechanism before ANONYMOUS
so the need to specify PLAIN
SASL must have another cause.
On the SASL side, yes it is announcing PLAIN before it. Given that, I dont actually see a reason you should need to specify PLAIN as enabled then, so long as the username and password values are set, which clearly they are or it still wouldnt work even once you specified it.
Vertx-amqp-client (or rather vertx-proton beneath it) simply iterates the offered list in the order offered, and picks the first one that it knows about and can do. It knows of PLAIN, ANONYMOUS, and EXTERNAL. In this case, it should find no match for the first offered proprietary mech, then try PLAIN, which it would pick so long as the username and password field are set. All that specifying specific enabled mechs does is have it additionally check first whether an offered mech is in the specified mechs before considering it. So if PLAIN works when specifying it, it seems it should also just work without doing so in this case.
(Aside: Service Bus should actually not even be offering EXTERNAL here at all unless you have a client certificate, it has a bug where it offers it regardless, as they misinterpreted what EXTERNAL meant at some point...it shouldn't cause any harm here since it is offered last though).
There must be something else at play then, because if I do not force PLAIN
then I cannot send messages (I get an unauthorised error, as if the anonymous connection is open to allow me to authenticate using CBS).
It does not surprise me that Microsoft is misinterpreting the standard with regards to EXTERNAL
- they are good at that!
I would be curious to know what it does then, I assume it somehow uses ANONYMOUS, even though I dont see a way for it to. There should presumably be a visible protocol trace difference to warrant an actual difference in behaviour, can you see what it is?
On the non-SASL element, the two scenarios in your last set of logs are sending to different places. Is it possible the configuration isnt as expected, and it isnt sending because some things dont align?
Vertx is establishing a sender to address "my-queue-name":
[1201387136:0] -> Attach{name='my-queue-name', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='my-queue-name', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
For Smallrye it is establishing a sender to address "sbt-employee-updates":
[1340753317:0] -> Attach{name='sbt-employee-updates', handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST, source=Source{address='null', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list], capabilities=null}, target=Target{address='sbt-employee-updates', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}, unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0, maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null, properties=null}
Ah, my bad, it will be the EXTERNAL thing causing the issue. I forgot the client actually locally sorts its preferred mechanisms. EXTERNAL being there will mean it tries it, unless you restrict it to PLAIN so it cant.
And my bad for the different addresses — it was bound to happen — I was attempting to clean all names from the actual names I used in my code, and I missed one, so the addresses are actually the same.