From 79bcea7484ec24dc78af11a90c0d073d5a5ff026 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 5 Sep 2024 11:55:30 +0200 Subject: [PATCH] Mqtt flow control fix --- .../internal/stream/MqttServerFactory.java | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 9a30be3a84..3e728870fc 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -1326,7 +1326,7 @@ private int decodePublishV4( // special case, when payload is empty -> wait for window break decode; } - server.publishPayloadBytes = decodeablePublishPayloadBytes; + server.decodeablePublishPayloadBytes = decodeablePublishPayloadBytes; server.decodeablePacketBytes = limit - publishLimit; server.decoder = decodePublishPayload; progress = publishLimit; @@ -1462,7 +1462,7 @@ private int decodePublishV5( // special case, when payload is empty -> wait for window break decode; } - server.publishPayloadBytes = decodeablePublishPayloadBytes; + server.decodeablePublishPayloadBytes = decodeablePublishPayloadBytes; server.decodeablePacketBytes = limit - publishLimit; server.decoder = decodePublishPayload; progress = publishLimit; @@ -1496,25 +1496,28 @@ private int decodePublishPayload( { MqttServer.MqttPublishStream publisher = server.publishes.get(server.decodePublisherKey); - int publishablePayloadSize = - Math.min(Math.min(server.publishPayloadBytes, publisher.initialBudget()), length); - - final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + publishablePayloadSize); + int initialBudget = publisher.initialBudget(); + int lengthMax = Math.min(length, server.decodeablePublishPayloadBytes); + int reservedMax = Math.max(publisher.initialPad, Math.min(lengthMax + publisher.initialPad, initialBudget)); boolean canPublish = MqttState.initialOpened(publisher.state); - final int maximum = publishablePayloadSize; - final int minimum = Math.min(maximum, Math.max(publisher.initialMin, 1024)); + final int maximum = reservedMax; + final int minimum = Math.min(maximum, Math.max(publisher.initialMin, 1024) + publisher.initialPad); int valueClaimed = maximum; - if (canPublish && publisher.debitorIndex != NO_DEBITOR_INDEX && publishablePayloadSize != 0) + if (canPublish && publisher.debitorIndex != NO_DEBITOR_INDEX && lengthMax != 0) { valueClaimed = publisher.debitor.claim(traceId, publisher.debitorIndex, publisher.initialId, minimum, maximum, 0); } - if (canPublish && (valueClaimed != 0 || payload.sizeof() == 0)) + int sizeClaimed = valueClaimed - publisher.initialPad; + + final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + sizeClaimed); + + if (canPublish && (sizeClaimed != 0 || payload.sizeof() == 0)) { if (server.publishPayloadDeferred == 0) { @@ -1525,12 +1528,12 @@ private int decodePublishPayload( server.onDecodePublishPayload(traceId, authorization, valueClaimed, server.decodedPacketId, server.decodedQos, server.decodedFlags, server.decodedExpiryInterval, server.decodedContentType, server.decodedPayloadFormat, server.decodedResponseTopic, server.decodedCorrelationData, server.decodedUserProperties, - payload, payload.offset(), payload.offset() + valueClaimed, publisher.contentType); + payload, payload.offset(), payload.offset() + sizeClaimed, publisher.contentType); - progress = payload.offset() + valueClaimed; + progress = payload.offset() + sizeClaimed; - if (server.publishPayloadBytes == 0) + if (server.decodeablePublishPayloadBytes == 0) { server.decoder = decodePacketTypeByVersion.get(server.version); } @@ -2447,7 +2450,7 @@ private final class MqttServer private long decodePublisherKey; private int decodeablePacketBytes; private int publishPayloadDeferred; - public int publishPayloadBytes; + public int decodeablePublishPayloadBytes; private int willPayloadDeferred; public int willPayloadBytes; @@ -3266,7 +3269,7 @@ private void onDecodePublishPayload( if (publishPayloadDeferred == 0) { - publishPayloadDeferred = publishPayloadBytes - length; + publishPayloadDeferred = decodeablePublishPayloadBytes - length; final Flyweight dataEx = mqttPublishDataExRW.wrap(dataExtBuffer, 0, dataExtBuffer.capacity()) .typeId(mqttTypeId) .publish(p -> @@ -3292,7 +3295,7 @@ private void onDecodePublishPayload( { stream.doPublishData(traceId, authorization, reserved, packetId, payload, dataFlags, offset, limit, dataEx); - publishPayloadBytes -= length; + decodeablePublishPayloadBytes -= length; } } else @@ -3305,13 +3308,13 @@ private void onDecodePublishPayload( { stream.doPublishData(traceId, authorization, reserved, packetId, payload, dataFlags, offset, limit, EMPTY_OCTETS); - publishPayloadBytes -= length; + decodeablePublishPayloadBytes -= length; } } } else { - publishPayloadBytes -= length; + decodeablePublishPayloadBytes -= length; } } } @@ -6009,7 +6012,7 @@ private void onPublishExpiredSignal( final long traceId = signal.traceId(); final long now = System.currentTimeMillis(); - if (now >= publishExpiresAt && publishPayloadBytes == 0) + if (now >= publishExpiresAt && decodeablePublishPayloadBytes == 0) { doPublishAppEnd(traceId); } @@ -6131,7 +6134,7 @@ private void cleanupAbort( private int initialBudget() { - return initialMax - (int)(initialSeq - initialAck) - initialPad; + return initialMax - (int)(initialSeq - initialAck); } }