Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Differentiate current epoch from offset epoch
  • Loading branch information
emasab committed Feb 27, 2023
commit 3dfa1f393c04492cb004f5c412f51f8f21359c07
31 changes: 28 additions & 3 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,10 @@ void rd_kafka_topic_partition_destroy(rd_kafka_topic_partition_t *rktpar);


/**
* @brief Sets the partition's leader epoch (use -1 to clear).
* @brief Sets the offset leader epoch (use -1 to clear).
*
* @param rktpar Partition object.
* @param leader_epoch Partition leader epoch, use -1 to reset.
* @param leader_epoch Offset leader epoch, use -1 to reset.
*
* @remark See KIP-320 for more information.
*/
Expand All @@ -931,7 +931,7 @@ void rd_kafka_topic_partition_set_leader_epoch(
int32_t leader_epoch);

/**
* @returns the partition's leader epoch, if relevant and known,
* @returns the offset leader epoch, if relevant and known,
* else -1.
*
* @param rktpar Partition object.
Expand All @@ -943,6 +943,31 @@ int32_t rd_kafka_topic_partition_get_leader_epoch(
const rd_kafka_topic_partition_t *rktpar);


/**
* @brief Sets the partition leader current epoch (use -1 to clear).
*
* @param rktpar Partition object.
* @param leader_epoch Partition leader current epoch, use -1 to reset.
*
* @remark See KIP-320 for more information.
*/
RD_EXPORT
void rd_kafka_topic_partition_set_current_leader_epoch(
rd_kafka_topic_partition_t *rktpar,
int32_t leader_epoch);

/**
* @returns the partition leader current epoch, if relevant and known,
* else -1.
*
* @param rktpar Partition object.
*
* @remark See KIP-320 for more information.
*/
RD_EXPORT
int32_t rd_kafka_topic_partition_get_current_leader_epoch(
const rd_kafka_topic_partition_t *rktpar);

/**
* @brief A growable list of Topic+Partitions.
*
Expand Down
8 changes: 4 additions & 4 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,7 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) {

if (rd_kafka_buf_ApiVersion(rkbuf) >= 9) {
/* CurrentLeaderEpoch */
rd_kafka_buf_write_i32(
rkbuf, rktp->rktp_offsets.fetch_pos.leader_epoch);
rd_kafka_buf_write_i32(rkbuf, rktp->rktp_leader_epoch);
}

/* FetchOffset */
Expand All @@ -878,12 +877,13 @@ int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) {

rd_rkb_dbg(rkb, FETCH, "FETCH",
"Fetch topic %.*s [%" PRId32 "] at offset %" PRId64
" (leader epoch %" PRId32 ", v%d)",
" (leader epoch %" PRId32
", current leader epoch %" PRId32 ", v%d)",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rktp->rktp_offsets.fetch_pos.offset,
rktp->rktp_offsets.fetch_pos.leader_epoch,
rktp->rktp_fetch_version);
rktp->rktp_leader_epoch, rktp->rktp_fetch_version);

/* We must have a valid fetch offset when we get here */
rd_dassert(rktp->rktp_offsets.fetch_pos.offset >= 0);
Expand Down
23 changes: 14 additions & 9 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
rd_kafka_toppar_t *rktp = opaque;
rd_kafka_topic_partition_t *rktpar;
int64_t end_offset;
int32_t leader_epoch;
int32_t end_offset_leader_epoch;

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_toppar_destroy(rktp); /* Drop refcnt */
Expand Down Expand Up @@ -1001,11 +1001,12 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
}


rktpar = &parts->elems[0];
end_offset = rktpar->offset;
leader_epoch = rd_kafka_topic_partition_get_leader_epoch(rktpar);
rktpar = &parts->elems[0];
end_offset = rktpar->offset;
end_offset_leader_epoch =
rd_kafka_topic_partition_get_leader_epoch(rktpar);

if (end_offset < 0 || leader_epoch < 0) {
if (end_offset < 0 || end_offset_leader_epoch < 0) {
rd_kafka_offset_reset(rktp, rd_kafka_broker_id(rkb),
rktp->rktp_next_fetch_start,
RD_KAFKA_RESP_ERR__LOG_TRUNCATION,
Expand All @@ -1023,7 +1024,7 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
"broker end offset is %" PRId64
" (leader epoch %" PRId32 ")",
rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start),
end_offset, leader_epoch);
end_offset, end_offset_leader_epoch);

} else {
rd_kafka_error_t *error;
Expand All @@ -1046,7 +1047,7 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
"attempted seek to end offset failed: %s",
rd_kafka_fetch_pos2str(
rktp->rktp_next_fetch_start),
end_offset, leader_epoch,
end_offset, end_offset_leader_epoch,
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
Expand All @@ -1064,9 +1065,11 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
"succeeded: broker end offset %" PRId64
" (leader epoch %" PRId32 ")",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, end_offset, leader_epoch);
rktp->rktp_partition, end_offset,
end_offset_leader_epoch);

rktp->rktp_next_fetch_start.leader_epoch = leader_epoch;
rktp->rktp_next_fetch_start.leader_epoch =
end_offset_leader_epoch;
rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
}
Expand Down Expand Up @@ -1175,6 +1178,8 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
parts, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rd_kafka_topic_partition_set_leader_epoch(
rktpar, rktp->rktp_next_fetch_start.leader_epoch);
rd_kafka_topic_partition_set_current_leader_epoch(
rktpar, rktp->rktp_leader_epoch);
rd_kafka_toppar_keep(rktp); /* for request opaque */

rd_rkb_dbg(rktp->rktp_leader, FETCH, "VALIDATE",
Expand Down
78 changes: 59 additions & 19 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ static void rd_kafka_toppar_lag_handle_Offset(rd_kafka_t *rk,
*/
static void rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t *rktp) {
rd_kafka_topic_partition_list_t *partitions;
rd_kafka_topic_partition_t *rktpar;

if (rktp->rktp_wait_consumer_lag_resp)
return; /* Previous request not finished yet */
Expand Down Expand Up @@ -153,9 +154,11 @@ static void rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t *rktp) {
rktp->rktp_wait_consumer_lag_resp = 1;

partitions = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(
partitions, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition)
->offset = RD_KAFKA_OFFSET_BEGINNING;
rktpar = rd_kafka_topic_partition_list_add(
partitions, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
rktpar->offset = RD_KAFKA_OFFSET_BEGINNING;
rd_kafka_topic_partition_set_current_leader_epoch(
rktpar, rktp->rktp_leader_epoch);

/* Ask for oldest offset. The newest offset is automatically
* propagated in FetchResponse.HighwaterMark. */
Expand Down Expand Up @@ -1281,7 +1284,7 @@ void rd_kafka_toppar_offset_fetch(rd_kafka_toppar_t *rktp,
part = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add0(__FUNCTION__, __LINE__, part,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktp);
rktp->rktp_partition, rktp, NULL);

rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH);
rko->rko_rktp = rd_kafka_toppar_keep(rktp);
Expand Down Expand Up @@ -1561,6 +1564,8 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp,
offsets, rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition);
rd_kafka_topic_partition_set_from_fetch_pos(rktpar, query_pos);
rd_kafka_topic_partition_set_current_leader_epoch(
rktpar, rktp->rktp_leader_epoch);

rd_kafka_ListOffsetsRequest(
rkb, offsets,
Expand Down Expand Up @@ -2653,6 +2658,30 @@ void rd_kafka_topic_partition_set_leader_epoch(
parpriv->leader_epoch = leader_epoch;
}

int32_t rd_kafka_topic_partition_get_current_leader_epoch(
const rd_kafka_topic_partition_t *rktpar) {
const rd_kafka_topic_partition_private_t *parpriv;

if (!(parpriv = rktpar->_private))
return -1;

return parpriv->current_leader_epoch;
}

void rd_kafka_topic_partition_set_current_leader_epoch(
rd_kafka_topic_partition_t *rktpar,
int32_t current_leader_epoch) {
rd_kafka_topic_partition_private_t *parpriv;

/* Avoid allocating private_t if clearing the epoch */
if (current_leader_epoch == -1 && !rktpar->_private)
return;

parpriv = rd_kafka_topic_partition_get_private(rktpar);

parpriv->current_leader_epoch = current_leader_epoch;
}

/**
* @brief Set offset and leader epoch from a fetchpos.
*/
Expand Down Expand Up @@ -2726,13 +2755,14 @@ void rd_kafka_topic_partition_list_destroy_free(void *ptr) {
*
* @returns a pointer to the added element.
*/
rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char *func,
int line,
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t partition,
rd_kafka_toppar_t *rktp) {
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add0(
const char *func,
int line,
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t partition,
rd_kafka_toppar_t *rktp,
const rd_kafka_topic_partition_private_t *parpriv) {
rd_kafka_topic_partition_t *rktpar;
if (rktparlist->cnt == rktparlist->size)
rd_kafka_topic_partition_list_grow(rktparlist, 1);
Expand All @@ -2744,9 +2774,20 @@ rd_kafka_topic_partition_list_add0(const char *func,
rktpar->partition = partition;
rktpar->offset = RD_KAFKA_OFFSET_INVALID;

if (rktp)
rd_kafka_topic_partition_get_private(rktpar)->rktp =
rd_kafka_toppar_keep_fl(func, line, rktp);
if (parpriv) {
rd_kafka_topic_partition_private_t *parpriv_copy =
rd_kafka_topic_partition_get_private(rktpar);
if (parpriv->rktp) {
parpriv_copy->rktp =
rd_kafka_toppar_keep_fl(func, line, parpriv->rktp);
}
parpriv_copy->leader_epoch = parpriv->leader_epoch;
parpriv_copy->current_leader_epoch = parpriv->leader_epoch;
} else if (rktp) {
rd_kafka_topic_partition_private_t *parpriv_copy =
rd_kafka_topic_partition_get_private(rktpar);
parpriv_copy->rktp = rd_kafka_toppar_keep_fl(func, line, rktp);
}

return rktpar;
}
Expand All @@ -2757,7 +2798,7 @@ rd_kafka_topic_partition_list_add(rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t partition) {
return rd_kafka_topic_partition_list_add0(
__FUNCTION__, __LINE__, rktparlist, topic, partition, NULL);
__FUNCTION__, __LINE__, rktparlist, topic, partition, NULL, NULL);
}


Expand Down Expand Up @@ -2798,10 +2839,9 @@ void rd_kafka_topic_partition_list_add_copy(
const rd_kafka_topic_partition_t *rktpar) {
rd_kafka_topic_partition_t *dst;

dst = rd_kafka_topic_partition_list_add0(__FUNCTION__, __LINE__,
rktparlist, rktpar->topic,
rktpar->partition, NULL);

dst = rd_kafka_topic_partition_list_add0(
__FUNCTION__, __LINE__, rktparlist, rktpar->topic,
rktpar->partition, NULL, rktpar->_private);
rd_kafka_topic_partition_update(dst, rktpar);
}

Expand Down
54 changes: 28 additions & 26 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,26 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
} rktp_c;
};

/**
* @struct This is a separately allocated glue object used in
* rd_kafka_topic_partition_t._private to allow referencing both
* an rktp and/or a leader epoch. Both are optional.
* The rktp, if non-NULL, owns a refcount.
*
* This glue object is not always set in ._private, but allocated on demand
* as necessary.
*/
typedef struct rd_kafka_topic_partition_private_s {
/** Reference to a toppar. Optional, may be NULL. */
rd_kafka_toppar_t *rktp;
/** Current Leader epoch, if known, else -1.
* this is set when the API needs to send the last epoch known
* by the client. */
int32_t current_leader_epoch;
/** Leader epoch if known, else -1. */
int32_t leader_epoch;
} rd_kafka_topic_partition_private_t;


/**
* Check if toppar is paused (consumer).
Expand Down Expand Up @@ -646,13 +666,14 @@ void rd_kafka_topic_partition_list_destroy_free(void *ptr);
void rd_kafka_topic_partition_list_clear(
rd_kafka_topic_partition_list_t *rktparlist);

rd_kafka_topic_partition_t *
rd_kafka_topic_partition_list_add0(const char *func,
int line,
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t partition,
rd_kafka_toppar_t *rktp);
rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_add0(
const char *func,
int line,
rd_kafka_topic_partition_list_t *rktparlist,
const char *topic,
int32_t partition,
rd_kafka_toppar_t *rktp,
const rd_kafka_topic_partition_private_t *parpriv);

rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_upsert(
rd_kafka_topic_partition_list_t *rktparlist,
Expand Down Expand Up @@ -726,25 +747,6 @@ int rd_kafka_topic_partition_list_cmp(const void *_a,
const void *_b,
int (*cmp)(const void *, const void *));


/**
* @struct This is a separately allocated glue object used in
* rd_kafka_topic_partition_t._private to allow referencing both
* an rktp and/or a leader epoch. Both are optional.
* The rktp, if non-NULL, owns a refcount.
*
* This glue object is not always set in ._private, but allocated on demand
* as necessary.
*/
typedef struct rd_kafka_topic_partition_private_s {
/** Reference to a toppar. Optional, may be NULL. */
rd_kafka_toppar_t *rktp;
/** Leader epoch if known, else -1. */
int32_t leader_epoch;
} rd_kafka_topic_partition_private_t;



/**
* @returns (and creates if necessary) the ._private glue object.
*/
Expand Down
Loading