Describe the bug
A clear and concise description of what the bug is.
To Reproduce
Steps to reproduce the behavior:
- run an MQTT kafka broker
- run repeat mosquito command
mosquitto_pub --url mqtt://localhost:7183/zilla -q 1 --message "Hello, world $(date)" --repeat 10000 -d -i sim-1;
-
see empty key and empty messages in the mqtt-retained topic
-
More load testing using 100 clients that produce 500 msgs each for an expected total of 50k messages:
QOS 2 has empty messages in the retained topic and doesn't deliver all 50k messages. ~500 msg/sec
for i in {1..100} ; do
mosquitto_pub --url mqtt://localhost:7183/qos2 -q 2 --message "QOS 2" --repeat 500 -d -i "sim-$i" >> "sim2-$i.out" &
done
QOS 1 has empty messages in the retained topic and does deliver all 50k messages. ~1k msg/sec
for i in {1..100} ; do
mosquitto_pub --url mqtt://localhost:7183/qos1 -q 1 --message "QOS 1" --repeat 500 -d -i "sim-$i" >> "sim1-$i.out" &
done
QOS 0 crashes before being able to finish
for i in {1..100} ; do
mosquitto_pub --url mqtt://localhost:7183/qos0 -q 0 --message "QOS 0" --repeat 500 -d -i "sim-$i" >> "sim0-$i.out" &
done
Exceptions encoutered during load testing QOS 0:
org.agrona.concurrent.AgentTerminationException: java.lang.IllegalArgumentException: hostname can't be null
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:841)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalArgumentException: hostname can't be null
at java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:159)
at java.base/java.net.InetSocketAddress.<init>(InetSocketAddress.java:226)
at io.aklivity.zilla.runtime.binding.tcp@0.9.89/io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientRouter.resolveInetSocketAddress(TcpClientRouter.java:288)
at io.aklivity.zilla.runtime.binding.tcp@0.9.89/io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientRouter.resolve(TcpClientRouter.java:96)
at io.aklivity.zilla.runtime.binding.tcp@0.9.89/io.aklivity.zilla.runtime.binding.tcp.internal.stream.TcpClientFactory.newStream(TcpClientFactory.java:155)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleBeginInitial(EngineWorker.java:1576)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleDefaultReadInitial(EngineWorker.java:1340)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1280)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1227)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:835)
... 4 more
Suppressed: java.lang.Exception: [engine/data#3] [0x0303000000001115] streams=[consumeAt=0x0025e978 (0x000000000025e978), produceAt=0x0025f4d8 (0x000000000025f4d8)]
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:839)
... 4 more
org.agrona.concurrent.AgentTerminationException: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:841)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:100)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
at java.base/java.util.Objects.checkIndex(Objects.java:365)
at java.base/java.util.ArrayList.get(ArrayList.java:428)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerProduceFactory$KafkaCacheServerProduceFan.doServerFanInitialDataIfNecessary(KafkaCacheServerProduceFactory.java:650)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerProduceFactory$KafkaCacheServerProduceFan.onServerFanInitialWindow(KafkaCacheServerProduceFactory.java:827)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaCacheServerProduceFactory$KafkaCacheServerProduceFan.onServerFanMessage(KafkaCacheServerProduceFactory.java:874)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1292)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1227)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:835)
... 4 more
Suppressed: java.lang.Exception: [engine/data#3] [0x0303000000001a31] streams=[consumeAt=0x003681c8 (0x0000000000b681c8), produceAt=0x0036e948 (0x0000000000b6e948)]
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:839)
... 4 more
org.agrona.concurrent.AgentTerminationException: java.lang.IllegalStateException: Unable to write to streams buffer
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:841)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalStateException: Unable to write to streams buffer
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.stream.Target.handleWrite(Target.java:159)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory.doFlush(KafkaMergedFactory.java:910)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaUnmergedProduceStream.doProduceInitialFlush(KafkaMergedFactory.java:4059)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedFetchFlush(KafkaMergedFactory.java:1463)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedInitialFlush(KafkaMergedFactory.java:1389)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedMessage(KafkaMergedFactory.java:1142)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1271)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1227)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:835)
... 4 more
Suppressed: java.lang.Exception: [engine/data#0] [0x0000000000000a8d] streams=[consumeAt=0x001f49a0 (0x00000000005f49a0), produceAt=0x003ffff0 (0x00000000007ffff0)]
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:839)
... 4 more
org.agrona.concurrent.AgentTerminationException: java.lang.IllegalStateException: Unable to write to streams buffer
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:841)
at org.agrona.core/org.agrona.concurrent.AgentRunner.doWork(AgentRunner.java:304)
at org.agrona.core/org.agrona.concurrent.AgentRunner.workLoop(AgentRunner.java:296)
at org.agrona.core/org.agrona.concurrent.AgentRunner.run(AgentRunner.java:162)
at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.lang.IllegalStateException: Unable to write to streams buffer
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.stream.Target.handleWrite(Target.java:159)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory.doFlush(KafkaMergedFactory.java:910)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaUnmergedProduceStream.doProduceInitialFlush(KafkaMergedFactory.java:4059)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedFetchFlush(KafkaMergedFactory.java:1463)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedInitialFlush(KafkaMergedFactory.java:1389)
at io.aklivity.zilla.runtime.binding.kafka@0.9.89/io.aklivity.zilla.runtime.binding.kafka.internal.stream.KafkaMergedFactory$KafkaMergedStream.onMergedMessage(KafkaMergedFactory.java:1142)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleReadInitial(EngineWorker.java:1271)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.handleRead(EngineWorker.java:1227)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.concurent.ManyToOneRingBuffer.read(ManyToOneRingBuffer.java:229)
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:835)
... 4 more
Suppressed: java.lang.Exception: [engine/data#2] [0x0202000000000367] streams=[consumeAt=0x001da128 (0x00000000001da128), produceAt=0x0008a2c8 (0x000000000048a2c8)]
at io.aklivity.zilla.runtime.engine@0.9.89/io.aklivity.zilla.runtime.engine.internal.registry.EngineWorker.doWork(EngineWorker.java:839)
... 4 more
Describe the bug
A clear and concise description of what the bug is.
To Reproduce
Steps to reproduce the behavior:
see empty key and empty messages in the
mqtt-retainedtopicMore load testing using 100 clients that produce 500 msgs each for an expected total of 50k messages:
QOS 2 has empty messages in the retained topic and doesn't deliver all 50k messages. ~500 msg/sec
QOS 1 has empty messages in the retained topic and does deliver all 50k messages. ~1k msg/sec
QOS 0 crashes before being able to finish
Exceptions encoutered during load testing QOS 0: