From 86cc2818ad873ff5ee9406d1db46727f381e2b80 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Fri, 20 Oct 2023 11:15:57 +0200 Subject: [PATCH] Fix flow control bug in mqtt-kakfa publish --- .../mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java index 258fa73218..da8477f644 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaPublishFactory.java @@ -320,7 +320,7 @@ private void onMqttData( assert acknowledge <= sequence; assert sequence >= initialSeq; - initialSeq = sequence; + initialSeq = sequence + reserved; assert initialAck <= initialSeq; @@ -604,6 +604,8 @@ private void doMqttWindow( initialAck = newInitialAck; initialMax = newInitialMax; + assert initialAck <= initialSeq; + doWindow(mqtt, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, authorization, budgetId, padding, 0, capabilities); }