zilla
zilla copied to clipboard
Running `emqtt_bench` both `sub` and `pub` triggers an exception
Describe the bug A clear and concise description of what the bug is.
To Reproduce Steps to reproduce the behavior:
- Start https://github.com/aklivity/zilla-examples/tree/main/mqtt.kafka.broker
- Download the emqtt_bench tool
- Run
./emqtt_bench pub -c 100 -I 10 -t bench/%i -s 256 --shortids -p 7183
- Run
./emqtt_bench sub -c 100 -t bench/%i -s 256 --shortids -p 7183
- See error
org.agrona.concurrent.AgentTerminationException: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory$MqttServer$MqttPublishStream.initialBudget()" because "publisher" is null
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:823)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.lang.NullPointerException: Cannot invoke "io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory$MqttServer$MqttPublishStream.initialBudget()" because "publisher" is null
at io.aklivity.zilla.runtime.binding.mqtt/io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory.decodePublishPayload(MqttServerFactory.java:1482)
at io.aklivity.zilla.runtime.binding.mqtt/io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory$MqttServer.decodeNetwork(MqttServerFactory.java:4804)
at io.aklivity.zilla.runtime.binding.mqtt/io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory$MqttServer.onNetworkData(MqttServerFactory.java:2612)
at io.aklivity.zilla.runtime.binding.mqtt/io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory$MqttServer.onNetwork(MqttServerFactory.java:2527)
at io.aklivity.zilla.runtime.binding.mqtt/io.aklivity.zilla.runtime.binding.mqtt.internal.stream.MqttServerFactory.lambda$newStream$1(MqttServerFactory.java:569)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1238)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1205)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:817)
... 4 more
Suppressed: java.lang.Exception: [engine/data#4] [0x0404000000000a29] streams=[consumeAt=0x00175ed0 (0x0000000000175ed0), produceAt=0x00178a30 (0x0000000000178a30)]
at io.aklivity.zilla.runtime.engine/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:821)
... 4 more
Expected behavior No exception