-
Notifications
You must be signed in to change notification settings - Fork 23
KIP-848 migration guide #386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
emasab
wants to merge
1
commit into
master
Choose a base branch
from
dev_kip-848_migration-guide
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+182
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
# Next-generation consumer group rebalance protocol migration guide | ||
|
||
Starting with **confluent-kafka-javascript 1.6.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**. | ||
|
||
**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are a few contract changes associated with the new protocol and they might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided. | ||
|
||
# Overview | ||
|
||
- **What changed:** | ||
|
||
The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. | ||
|
||
- **Requirements:** | ||
|
||
- Broker version **4.0.0+** | ||
- confluent-kafka-javascript version **1.6.0+**: GA (production-ready) | ||
|
||
- **Enablement (client-side):** | ||
|
||
- `group.protocol=consumer` | ||
- `group.remote.assignor=<assignor>` (optional; broker-controlled if unset; default broker assignor is `uniform`) | ||
|
||
# Available Features | ||
|
||
All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including: | ||
|
||
- Subscription to one or more topics, including **regular expression (regex) subscriptions** | ||
- Rebalance callbacks (**incremental only**) | ||
- Static group membership | ||
- Configurable remote assignor | ||
- Enforced max poll interval | ||
- Upgrade from `classic` protocol or downgrade from `consumer` protocol | ||
- AdminClient changes as per KIP | ||
|
||
# Contract Changes | ||
|
||
## Client Configuration changes | ||
|
||
| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | | ||
|--------------------------------------------------|-------------------------------------------------------| | ||
| `partition.assignment.strategy` | `group.remote.assignor` | | ||
| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` | | ||
| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` | | ||
| `group.protocol.type` | Not used in the new protocol | | ||
|
||
**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol. | ||
|
||
## Rebalance Callback Changes | ||
|
||
- The **protocol is fully incremental** in KIP-848. | ||
- In the **rebalance callbacks**, you **can use** (optional - if not used, client will handle it internally): | ||
- `assign(partitions)` to assign new partitions | ||
- `unassign(partitions)` to revoke partitions | ||
- If you don't call assign/unassign inside rebalance callbacks, the client will automatically use assign/unassign internally. | ||
- ⚠️ The `partitions` list passed to `assign()` and `unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case in the classic protocol. | ||
- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. | ||
|
||
## Static Group Membership | ||
|
||
- Duplicate `group.instance.id` handling: | ||
- **Newly joining member** is fenced with **`ErrorCodes.ERR_UNRELEASED_INSTANCE_ID` (fatal)**. | ||
- (Classic protocol fenced the **existing** member instead.) | ||
- Implications: | ||
- Ensure only **one active instance per** `group.instance.id`. | ||
- Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. | ||
|
||
## Session Timeout & Fetching | ||
|
||
- **Session timeout is broker-controlled**: | ||
- If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. | ||
- Consumer is fenced once a heartbeat response is received from the Coordinator. | ||
- In the classic protocol, the client stopped fetching when session timeout expired. | ||
|
||
## Closing / Auto-Commit | ||
|
||
- On `disconnect()`: | ||
- Member retries committing offsets until a timeout expires. | ||
- Currently uses the **default remote session timeout** (45s). | ||
- Future **KIP-1092** will allow custom commit timeouts. | ||
|
||
## Error Handling Changes | ||
|
||
Errors are reported in the logger and in the error callback (not available at the moment). | ||
|
||
- `ErrorCodes.ERR_UNKNOWN_TOPIC_OR_PART` (**subscription case**): | ||
- No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. | ||
- `ErrorCodes.ERR_TOPIC_AUTHORIZATION_FAILED`: | ||
- Reported once per heartbeat or subscription change, even if only one topic is unauthorized. | ||
|
||
## Summary of Key Differences (Classic vs Next-Gen) | ||
|
||
- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** | ||
- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range | ||
- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs | ||
- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id` | ||
- **Session timeout:** Classic: enforced on client; KIP-848: enforced on broker | ||
- **Auto-commit on disconnect:** Classic: stops at client session timeout; KIP-848: retries until a default timeout (45s) is reached. | ||
- **Unknown topics:** KIP-848 does not return error on subscription if topic missing | ||
- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols | ||
|
||
# Minimal Example Config | ||
|
||
## Classic Protocol | ||
|
||
``` properties | ||
# Optional; default is 'classic' | ||
group.protocol=classic | ||
|
||
partition.assignment.strategy=<range,roundrobin,sticky> | ||
session.timeout.ms=45000 | ||
heartbeat.interval.ms=15000 | ||
``` | ||
|
||
## Next-Gen Protocol / KIP-848 | ||
|
||
``` properties | ||
group.protocol=consumer | ||
|
||
# Optional: select a remote assignor | ||
# Valid options currently: 'uniform' or 'range' | ||
# group.remote.assignor=<uniform,range> | ||
# If unset, broker chooses the assignor (default: 'uniform') | ||
|
||
# Session & heartbeat now controlled by broker: | ||
# group.consumer.session.timeout.ms | ||
# group.consumer.heartbeat.interval.ms | ||
``` | ||
|
||
# Rebalance Callback Migration | ||
|
||
## Range Assignor (Classic) | ||
|
||
``` javascript | ||
// Rebalance Callback for Range Assignor (Classic Protocol) | ||
rebalance_cb: function (err, assignment, assignmentFns) { | ||
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { | ||
assignmentFns.assign(assignment); | ||
} else { | ||
assignmentFns.unassign(); | ||
} | ||
} | ||
``` | ||
|
||
## Incremental Assignor (KIP-848 with any remote assignor type) | ||
|
||
``` javascript | ||
// Rebalance callback for incremental assignor | ||
rebalance_cb: function (err, assignment, assignmentFns) { | ||
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { | ||
assignmentFns.assign(assignment); | ||
} else { | ||
assignmentFns.unassign(assignment); | ||
} | ||
} | ||
``` | ||
|
||
**Note:** The `assignment` list contains **only partitions being added or revoked**, not the full partition list as in the classic case. | ||
|
||
# Upgrade and Downgrade | ||
|
||
- A group made up entirely of `classic` consumers runs under the classic protocol. | ||
- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. | ||
- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. | ||
- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. | ||
|
||
# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)) | ||
|
||
1. Upgrade to **confluent-kafka-javascript ≥ 1.6.0** (GA release) | ||
2. Run against **Kafka brokers ≥ 4.0.0** | ||
3. Set `group.protocol=consumer` | ||
4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range` | ||
5. Replace deprecated configs with new ones | ||
6. Update rebalance callbacks as there are now **incremental** | ||
7. Review static membership handling (`group.instance.id`) | ||
8. Ensure proper shutdown to avoid fencing issues | ||
9. Adjust error handling for unknown topics and authorization failures |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.