Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ private int decodePublishV4(
// special case, when payload is empty -> wait for window
break decode;
}
server.publishPayloadBytes = decodeablePublishPayloadBytes;
server.decodeablePublishPayloadBytes = decodeablePublishPayloadBytes;
server.decodeablePacketBytes = limit - publishLimit;
server.decoder = decodePublishPayload;
progress = publishLimit;
Expand Down Expand Up @@ -1462,7 +1462,7 @@ private int decodePublishV5(
// special case, when payload is empty -> wait for window
break decode;
}
server.publishPayloadBytes = decodeablePublishPayloadBytes;
server.decodeablePublishPayloadBytes = decodeablePublishPayloadBytes;
server.decodeablePacketBytes = limit - publishLimit;
server.decoder = decodePublishPayload;
progress = publishLimit;
Expand Down Expand Up @@ -1496,25 +1496,28 @@ private int decodePublishPayload(
{
MqttServer.MqttPublishStream publisher = server.publishes.get(server.decodePublisherKey);

int publishablePayloadSize =
Math.min(Math.min(server.publishPayloadBytes, publisher.initialBudget()), length);

final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + publishablePayloadSize);
int initialBudget = publisher.initialBudget();
int lengthMax = Math.min(length, server.decodeablePublishPayloadBytes);
int reservedMax = Math.max(publisher.initialPad, Math.min(lengthMax + publisher.initialPad, initialBudget));

boolean canPublish = MqttState.initialOpened(publisher.state);

final int maximum = publishablePayloadSize;
final int minimum = Math.min(maximum, Math.max(publisher.initialMin, 1024));
final int maximum = reservedMax;
final int minimum = Math.min(maximum, Math.max(publisher.initialMin, 1024) + publisher.initialPad);

int valueClaimed = maximum;

if (canPublish && publisher.debitorIndex != NO_DEBITOR_INDEX && publishablePayloadSize != 0)
if (canPublish && publisher.debitorIndex != NO_DEBITOR_INDEX && lengthMax != 0)
{
valueClaimed =
publisher.debitor.claim(traceId, publisher.debitorIndex, publisher.initialId, minimum, maximum, 0);
}

if (canPublish && (valueClaimed != 0 || payload.sizeof() == 0))
int sizeClaimed = valueClaimed - publisher.initialPad;

final OctetsFW payload = payloadRO.wrap(buffer, offset, offset + sizeClaimed);

if (canPublish && (sizeClaimed != 0 || payload.sizeof() == 0))
{
if (server.publishPayloadDeferred == 0)
{
Expand All @@ -1525,12 +1528,12 @@ private int decodePublishPayload(
server.onDecodePublishPayload(traceId, authorization, valueClaimed, server.decodedPacketId, server.decodedQos,
server.decodedFlags, server.decodedExpiryInterval, server.decodedContentType, server.decodedPayloadFormat,
server.decodedResponseTopic, server.decodedCorrelationData, server.decodedUserProperties,
payload, payload.offset(), payload.offset() + valueClaimed, publisher.contentType);
payload, payload.offset(), payload.offset() + sizeClaimed, publisher.contentType);

progress = payload.offset() + valueClaimed;
progress = payload.offset() + sizeClaimed;


if (server.publishPayloadBytes == 0)
if (server.decodeablePublishPayloadBytes == 0)
{
server.decoder = decodePacketTypeByVersion.get(server.version);
}
Expand Down Expand Up @@ -2447,7 +2450,7 @@ private final class MqttServer
private long decodePublisherKey;
private int decodeablePacketBytes;
private int publishPayloadDeferred;
public int publishPayloadBytes;
public int decodeablePublishPayloadBytes;
private int willPayloadDeferred;
public int willPayloadBytes;

Expand Down Expand Up @@ -3266,7 +3269,7 @@ private void onDecodePublishPayload(

if (publishPayloadDeferred == 0)
{
publishPayloadDeferred = publishPayloadBytes - length;
publishPayloadDeferred = decodeablePublishPayloadBytes - length;
final Flyweight dataEx = mqttPublishDataExRW.wrap(dataExtBuffer, 0, dataExtBuffer.capacity())
.typeId(mqttTypeId)
.publish(p ->
Expand All @@ -3292,7 +3295,7 @@ private void onDecodePublishPayload(
{
stream.doPublishData(traceId, authorization, reserved, packetId, payload, dataFlags,
offset, limit, dataEx);
publishPayloadBytes -= length;
decodeablePublishPayloadBytes -= length;
}
}
else
Expand All @@ -3305,13 +3308,13 @@ private void onDecodePublishPayload(
{
stream.doPublishData(traceId, authorization, reserved, packetId, payload, dataFlags,
offset, limit, EMPTY_OCTETS);
publishPayloadBytes -= length;
decodeablePublishPayloadBytes -= length;
}
}
}
else
{
publishPayloadBytes -= length;
decodeablePublishPayloadBytes -= length;
}
}
}
Expand Down Expand Up @@ -6009,7 +6012,7 @@ private void onPublishExpiredSignal(
final long traceId = signal.traceId();

final long now = System.currentTimeMillis();
if (now >= publishExpiresAt && publishPayloadBytes == 0)
if (now >= publishExpiresAt && decodeablePublishPayloadBytes == 0)
{
doPublishAppEnd(traceId);
}
Expand Down Expand Up @@ -6131,7 +6134,7 @@ private void cleanupAbort(

private int initialBudget()
{
return initialMax - (int)(initialSeq - initialAck) - initialPad;
return initialMax - (int)(initialSeq - initialAck);
}
}

Expand Down