-
Notifications
You must be signed in to change notification settings - Fork 70
Qos2 idempotent producer #733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
jfallows
merged 44 commits into
aklivity:feature/mqtt-kafka
from
bmaidics:qos2_idempontent
Feb 6, 2024
Merged
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
901f0b2
separating publish streams based on qos
bmaidics 79ddafd
Fix test
bmaidics 1265671
Checkpoint
bmaidics 7847580
Checkpoint
bmaidics 7895cd5
check
bmaidics f831ff8
start of idempotent work
bmaidics 412019a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into featu…
bmaidics 167a33a
Optimize memory allocation for mqtt-kafka offset tracking (#694)
bmaidics cf08e0f
checkpoint
bmaidics 80c0e09
checkpoint
bmaidics 62db2cb
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics 49a7b35
Checkpoint with retained offsetCommit stream
bmaidics 92b262a
checkpoint
bmaidics 29d24f7
checkpoint
bmaidics f9fe624
mqtt-kafka checkpoint
bmaidics 5195271
checkpoint
bmaidics 99f1c1f
checkpoint
bmaidics 8db664d
Fixes
bmaidics 7038c04
Fix flaky test
bmaidics e00dd8a
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics e3d92ee
fixes
bmaidics 31e26ed
fix dump
attilakreiner f4b27c1
Fix init produce id request
akrambek 9c5fd24
fix
bmaidics 35c6976
Fix bug
bmaidics a3c7e09
Fix
bmaidics 5cf9068
Don't flush early if the sequence number is not set
akrambek bc38ac7
Draft
bmaidics 724bd70
checkpoint
bmaidics 7df5123
Fixes
bmaidics 4204bc1
Adrress review comments
bmaidics 2a0204f
Merge remote-tracking branch 'upstream/feature/mqtt-kafka' into qos2_…
bmaidics ebd9cee
Merge fixes
bmaidics 8988e6b
Include producerId and producerEpoch into cache entry
akrambek 739f79d
fix dump
attilakreiner 878ec78
Fix qos2 large message
bmaidics 0563e13
Fix typo
akrambek 7f2ca2a
Merge branch 'qos2_idempontent' of github.com:bmaidics/zilla into qos…
akrambek 7d9afee
reviews
bmaidics c274636
more feedback
bmaidics 215791e
checkpoint
bmaidics d3635b3
Refactor
bmaidics 007e8d1
Adjust code coverage ratio
jfallows ccae27c
Ignore IT that fails only on GitHub Actions, see issue #786
jfallows File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Adrress review comments
- Loading branch information
commit 4204bc11b025f78b010cb7cfd6b1bdfd6778265e
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
...a/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/PublishClientMetadata.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Copyright 2021-2023 Aklivity Inc | ||
| * | ||
| * Licensed under the Aklivity Community License (the "License"); you may not use | ||
| * this file except in compliance with the License. You may obtain a copy of the | ||
| * License at | ||
| * | ||
| * https://www.aklivity.io/aklivity-community-license/ | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations under the License. | ||
| */ | ||
| package io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.stream; | ||
|
|
||
| import org.agrona.collections.Int2ObjectHashMap; | ||
| import org.agrona.collections.IntArrayList; | ||
| import org.agrona.collections.Long2LongHashMap; | ||
| import org.agrona.collections.Long2ObjectHashMap; | ||
|
|
||
| public class PublishClientMetadata | ||
|
bmaidics marked this conversation as resolved.
Outdated
|
||
| { | ||
| final Long2ObjectHashMap<PublishOffsetMetadata> offsets; | ||
| final Int2ObjectHashMap<KafkaTopicPartition> partitions; | ||
| final Int2ObjectHashMap<KafkaTopicPartition> retainedPartitions; | ||
| final Long2LongHashMap leaderEpochs; | ||
|
|
||
| KafkaGroup group; | ||
|
|
||
| public PublishClientMetadata( | ||
| Long2ObjectHashMap<PublishOffsetMetadata> offsets, | ||
| Int2ObjectHashMap<KafkaTopicPartition> partitions, | ||
| Int2ObjectHashMap<KafkaTopicPartition> retainedPartitions, | ||
| Long2LongHashMap leaderEpochs) | ||
| { | ||
| this.offsets = offsets; | ||
| this.partitions = partitions; | ||
| this.retainedPartitions = retainedPartitions; | ||
| this.leaderEpochs = leaderEpochs; | ||
| } | ||
|
|
||
| public static final class KafkaGroup | ||
| { | ||
| public final String instanceId; | ||
| public final String groupId; | ||
| public final String memberId; | ||
| public final int generationId; | ||
|
|
||
| KafkaGroup( | ||
| String instanceId, | ||
| String groupId, | ||
| String memberId, | ||
| int generationId) | ||
| { | ||
| this.instanceId = instanceId; | ||
| this.groupId = groupId; | ||
| this.memberId = memberId; | ||
| this.generationId = generationId; | ||
| } | ||
| } | ||
|
|
||
| public static final class KafkaTopicPartition | ||
| { | ||
| public final String topic; | ||
| public final int partitionId; | ||
|
|
||
| KafkaTopicPartition( | ||
| String topic, | ||
| int partitionId) | ||
| { | ||
| this.topic = topic; | ||
| this.partitionId = partitionId; | ||
| } | ||
| } | ||
|
|
||
| public static final class PublishOffsetMetadata | ||
|
bmaidics marked this conversation as resolved.
Outdated
|
||
| { | ||
| public final long producerId; | ||
| public final short producerEpoch; | ||
| public final IntArrayList packetIds; | ||
|
|
||
| public long sequence; | ||
|
|
||
| PublishOffsetMetadata( | ||
| long producerId, | ||
| short producerEpoch) | ||
| { | ||
| this.sequence = 1; | ||
| this.producerId = producerId; | ||
| this.producerEpoch = producerEpoch; | ||
| this.packetIds = new IntArrayList(); | ||
|
bmaidics marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| PublishOffsetMetadata( | ||
| long producerId, | ||
| short producerEpoch, | ||
| IntArrayList packetIds) | ||
| { | ||
| this.sequence = 1; | ||
| this.producerId = producerId; | ||
| this.producerEpoch = producerEpoch; | ||
| this.packetIds = packetIds; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.