Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove next leader epoch
using a fetch state check instead.
Fix stale next fetch position update
when fetch hasn't started yet
  • Loading branch information
emasab committed Mar 10, 2023
commit c70fc0d0bcaca6e0d1637b2f33a36df899e79ecf
45 changes: 25 additions & 20 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1013,10 +1013,8 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
}


/* Validation succeeded, replace leader epoch */
rktp->rktp_leader_epoch = rktp->rktp_next_leader_epoch;
rktpar = &parts->elems[0];
end_offset = rktpar->offset;
rktpar = &parts->elems[0];
end_offset = rktpar->offset;
end_offset_leader_epoch =
rd_kafka_topic_partition_get_leader_epoch(rktpar);

Expand Down Expand Up @@ -1142,6 +1140,28 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
return;
}


if (rktp->rktp_leader_id == -1 || !rktp->rktp_leader ||
rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
"%.*s [%" PRId32
"]: unable to perform offset "
"validation: partition leader not available",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);

rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
return;
}

/* Update next fetch position, that could be stale since last
* fetch start. Only if the offset is real. */
if (rktp->rktp_offsets.fetch_pos.offset > 0) {
rd_kafka_toppar_set_next_fetch_position(
rktp, rktp->rktp_offsets.fetch_pos);
}

/* If the fetch start position does not have an epoch set then
* there is no point in doing validation.
* This is the case for epoch-less seek()s or epoch-less
Expand All @@ -1155,18 +1175,6 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start));
return;
}

if (rktp->rktp_leader_id == -1 || !rktp->rktp_leader ||
rktp->rktp_leader->rkb_source == RD_KAFKA_INTERNAL) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
"%.*s [%" PRId32
"]: unable to perform offset "
"validation: partition leader not available",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);

rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
return;
Expand All @@ -1175,17 +1183,14 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);

rd_kafka_toppar_set_next_fetch_position(rktp,
rktp->rktp_offsets.fetch_pos);

/* Construct and send OffsetForLeaderEpochRequest */
parts = rd_kafka_topic_partition_list_new(1);
rktpar = rd_kafka_topic_partition_list_add(
parts, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rd_kafka_topic_partition_set_leader_epoch(
rktpar, rktp->rktp_next_fetch_start.leader_epoch);
rd_kafka_topic_partition_set_current_leader_epoch(
rktpar, rktp->rktp_next_leader_epoch);
rktpar, rktp->rktp_leader_epoch);
rd_kafka_toppar_keep(rktp); /* for request opaque */

rd_rkb_dbg(rktp->rktp_leader, FETCH, "VALIDATE",
Expand Down
11 changes: 5 additions & 6 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,11 @@ rd_kafka_toppar_t *rd_kafka_toppar_new0(rd_kafka_topic_t *rkt,

rktp = rd_calloc(1, sizeof(*rktp));

rktp->rktp_partition = partition;
rktp->rktp_rkt = rkt;
rktp->rktp_leader_id = -1;
rktp->rktp_broker_id = -1;
rktp->rktp_leader_epoch = -1;
rktp->rktp_next_leader_epoch = -1;
rktp->rktp_partition = partition;
rktp->rktp_rkt = rkt;
rktp->rktp_leader_id = -1;
rktp->rktp_broker_id = -1;
rktp->rktp_leader_epoch = -1;
rd_interval_init(&rktp->rktp_lease_intvl);
rd_interval_init(&rktp->rktp_new_lease_intvl);
rd_interval_init(&rktp->rktp_new_lease_log_intvl);
Expand Down
9 changes: 2 additions & 7 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,8 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
#define RD_KAFKA_TOPPAR_FETCH_IS_STARTED(fetch_state) \
((fetch_state) >= RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY)

int32_t rktp_leader_epoch; /**< Last known partition leader epoch,
* or -1. */
int32_t rktp_next_leader_epoch; /**< Next leader epoch,
* to replace after offset
* validation */


int32_t rktp_leader_epoch; /**< Last known partition leader epoch,
* or -1. */

int32_t rktp_fetch_msg_max_bytes; /* Max number of bytes to
* fetch.
Expand Down
8 changes: 5 additions & 3 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -675,16 +675,18 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
return 0;
}

if (leader_epoch > rktp->rktp_leader_epoch) {
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;
else if (leader_epoch > rktp->rktp_leader_epoch) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32
" epoch %" PRId32 " -> leader %" PRId32
" epoch %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktp->rktp_leader_id,
rktp->rktp_leader_epoch, leader_id, leader_epoch);
rktp->rktp_next_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;
rktp->rktp_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;
}

fetching_from_follower =
Expand Down