Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
…ring
  • Loading branch information
edenhill committed Dec 21, 2022
commit 34316a88b36cbd0605b497ec4e79b864c88a3f7c
7 changes: 5 additions & 2 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3298,7 +3298,9 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_throttle_time(reply);

offsets = rd_kafka_buf_read_topic_partitions(
reply, 0, rd_true /*read_offset*/, rd_true /*read_part_errs*/);
reply, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);
if (!offsets)
rd_kafka_buf_parse_fail(reply,
"Failed to parse topic partitions");
Expand Down Expand Up @@ -3879,7 +3881,8 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,
rd_kafka_buf_read_throttle_time(reply);

partitions = rd_kafka_buf_read_topic_partitions(
reply, 16, rd_false /*no offset */, rd_true /*read error*/);
reply, 16, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);
if (!partitions) {
rd_snprintf(errstr, errstr_size,
"Failed to parse OffsetDeleteResponse partitions");
Expand Down
5 changes: 2 additions & 3 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
rd_kafka_buf_write_topic_partitions(
rkbuf, owned_partitions,
rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/, rd_false /*don't write offsets*/,
rd_false /*don't write epoch*/,
rd_false /*don't write metadata*/);
rd_false /*any offset*/,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,7 @@ static void rd_kafka_cgrp_handle_SyncGroup_memberstate(

rd_kafka_buf_read_i16(rkbuf, &Version);
if (!(assignment = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, rd_false, rd_false)))
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION)))
goto err_parse;
rd_kafka_buf_read_bytes(rkbuf, &UserData);

Expand Down Expand Up @@ -1801,7 +1801,7 @@ static int rd_kafka_group_MemberMetadata_consumer_read(

if (Version >= 1 &&
!(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, rd_false, rd_false)))
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION)))
goto err;

rd_kafka_buf_destroy(rkbuf);
Expand Down
158 changes: 103 additions & 55 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -201,25 +201,24 @@ int rd_kafka_err_action(rd_kafka_broker_t *rkb,
* @brief Read a list of topic+partitions+extra from \p rkbuf.
*
* @param rkbuf buffer to read from
* @param estimated_part_cnt estimated number of partitions to read.
* @param read_part_errs whether or not to read an error per partition.
* @param fields An array of fields to read from the buffer and set on
* the rktpar object, in the specified order, must end
* with RD_KAFKA_TOPIC_PARTITION_FIELD_END.
*
* @returns a newly allocated list on success, or NULL on parse error.
*/
rd_kafka_topic_partition_list_t *
rd_kafka_buf_read_topic_partitions(rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
rd_bool_t read_offset,
rd_bool_t read_part_errs) {
rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions0(
rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
const rd_kafka_topic_partition_field_t *fields) {
const int log_decode_errors = LOG_ERR;
int16_t ErrorCode = 0;
int32_t TopicArrayCnt;
rd_kafka_topic_partition_list_t *parts = NULL;

rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);

parts = rd_kafka_topic_partition_list_new(
RD_MAX(TopicArrayCnt, (int)estimated_part_cnt));
RD_MAX(TopicArrayCnt * 4, (int)estimated_part_cnt));

while (TopicArrayCnt-- > 0) {
rd_kafkap_str_t kTopic;
Expand All @@ -233,24 +232,55 @@ rd_kafka_buf_read_topic_partitions(rd_kafka_buf_t *rkbuf,
RD_KAFKAP_STR_DUPA(&topic, &kTopic);

while (PartArrayCnt-- > 0) {
int32_t Partition;
int64_t Offset;
int32_t Partition = -1, Epoch = -1234;
int64_t Offset = -1234;
int16_t ErrorCode = 0;
rd_kafka_topic_partition_t *rktpar;
int fi;

rd_kafka_buf_read_i32(rkbuf, &Partition);
/*
* Read requested fields
*/
for (fi = 0;
fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
fi++) {
switch (fields[fi]) {
case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
rd_kafka_buf_read_i32(rkbuf,
&Partition);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
rd_kafka_buf_read_i64(rkbuf, &Offset);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
rd_kafka_buf_read_i32(rkbuf, &Epoch);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
rd_kafka_buf_read_i16(rkbuf,
&ErrorCode);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
rd_assert(!*"metadata not implemented");
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
break;
}
}

rktpar = rd_kafka_topic_partition_list_add(parts, topic,
Partition);

if (read_offset) {
rd_kafka_buf_read_i64(rkbuf, &Offset);
/* Use dummy sentinel values that are unlikely to be
* seen from the broker to know if we are to set these
* fields or not. */
if (Offset != -1234)
rktpar->offset = Offset;
}
if (Epoch != -1234)
rd_kafka_topic_partition_set_leader_epoch(
rktpar, Epoch);
rktpar->err = ErrorCode;

if (read_part_errs) {
rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
rktpar->err = ErrorCode;
}

rd_kafka_buf_skip_tags(rkbuf);
}
Expand All @@ -275,22 +305,18 @@ rd_kafka_buf_read_topic_partitions(rd_kafka_buf_t *rkbuf,
*
* @remark The \p parts list MUST be sorted.
*/
int rd_kafka_buf_write_topic_partitions(
int rd_kafka_buf_write_topic_partitions0(
rd_kafka_buf_t *rkbuf,
const rd_kafka_topic_partition_list_t *parts,
rd_bool_t skip_invalid_offsets,
rd_bool_t only_invalid_offsets,
rd_bool_t write_Offset,
rd_bool_t write_Epoch,
rd_bool_t write_Metadata) {
const rd_kafka_topic_partition_field_t *fields) {
size_t of_TopicArrayCnt;
size_t of_PartArrayCnt = 0;
int TopicArrayCnt = 0, PartArrayCnt = 0;
int i;
const char *prev_topic = NULL;
int cnt = 0;
rd_bool_t partition_id_only =
!write_Offset && !write_Epoch && !write_Metadata;

rd_assert(!only_invalid_offsets ||
(only_invalid_offsets != skip_invalid_offsets));
Expand All @@ -300,6 +326,7 @@ int rd_kafka_buf_write_topic_partitions(

for (i = 0; i < parts->cnt; i++) {
const rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
int fi;

if (rktpar->offset < 0) {
if (skip_invalid_offsets)
Expand Down Expand Up @@ -329,36 +356,56 @@ int rd_kafka_buf_write_topic_partitions(
rd_kafka_buf_write_arraycnt_pos(rkbuf);
}

/* Partition */
rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
PartArrayCnt++;

/* Time/Offset */
if (write_Offset) {
rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
}

if (write_Epoch) {
/* CommittedLeaderEpoch */
rd_kafka_buf_write_i32(rkbuf, -1);
/*
* Write requested fields
*/
for (fi = 0; fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
fi++) {
switch (fields[fi]) {
case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
rd_kafka_buf_write_i32(rkbuf,
rktpar->partition);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
rd_kafka_buf_write_i32(
rkbuf,
rd_kafka_topic_partition_get_leader_epoch(
rktpar));
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
rd_kafka_buf_write_i16(rkbuf, rktpar->err);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
/* Java client 0.9.0 and broker <0.10.0 can't
* parse Null metadata fields, so as a
* workaround we send an empty string if
* it's Null. */
if (!rktpar->metadata)
rd_kafka_buf_write_str(rkbuf, "", 0);
else
rd_kafka_buf_write_str(
rkbuf, rktpar->metadata,
rktpar->metadata_size);
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
break;
case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
break;
}
}

if (write_Metadata) {
/* Metadata */
/* Java client 0.9.0 and broker <0.10.0 can't parse
* Null metadata fields, so as a workaround we send an
* empty string if it's Null. */
if (!rktpar->metadata)
rd_kafka_buf_write_str(rkbuf, "", 0);
else
rd_kafka_buf_write_str(rkbuf, rktpar->metadata,
rktpar->metadata_size);
}

/* Tags for partition struct */
if (!partition_id_only)
if (fi > 1)
/* If there was more than one field written
* then this was a struct and thus needs the
* struct suffix tags written. */
rd_kafka_buf_write_tags(rkbuf);

PartArrayCnt++;
cnt++;
}

Expand Down Expand Up @@ -456,7 +503,8 @@ rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf,
int32_t kpartition;
int16_t ErrorCode;
int32_t OffsetArrayCnt;
int64_t Offset = -1;
int64_t Offset = -1;
int32_t LeaderEpoch = -1;
rd_kafka_topic_partition_t *rktpar;

rd_kafka_buf_read_i32(rkbuf, &kpartition);
Expand Down Expand Up @@ -989,8 +1037,8 @@ void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb,
/* Write partition list, filtering out partitions with valid offsets */
PartCnt = rd_kafka_buf_write_topic_partitions(
rkbuf, parts, rd_false /*include invalid offsets*/,
rd_false /*skip valid offsets */, rd_false /*don't write offsets*/,
rd_false /*don't write epoch */, rd_false /*don't write metadata*/);
rd_false /*skip valid offsets */,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);

if (ApiVersion >= 7) {
/* RequireStable */
Expand Down Expand Up @@ -3693,8 +3741,8 @@ rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb,

rd_kafka_buf_write_topic_partitions(
rkbuf, partitions, rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/, rd_true /*do write offsets*/,
rd_false /*don't write epoch*/, rd_false /*don't write metadata*/);
rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET);

/* timeout */
op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
Expand Down
50 changes: 41 additions & 9 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,51 @@ int rd_kafka_err_action(rd_kafka_broker_t *rkb,

const char *rd_kafka_actions2str(int actions);

rd_kafka_topic_partition_list_t *
rd_kafka_buf_read_topic_partitions(rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
rd_bool_t read_offset,
rd_bool_t read_part_errs);
int rd_kafka_buf_write_topic_partitions(

typedef enum {
/** Array end sentinel */
RD_KAFKA_TOPIC_PARTITION_FIELD_END = 0,
/** Read/write int32_t for partition */
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
/** Read/write int64_t for offset */
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
/** Read/write int32_t for leader_epoch */
RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
/** Read/write int16_t for error code */
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
/** Read/write str for metadata */
RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA,
/** Noop, useful for ternary ifs */
RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
} rd_kafka_topic_partition_field_t;

rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions0(
rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
const rd_kafka_topic_partition_field_t *fields);

#define rd_kafka_buf_read_topic_partitions(rkbuf, estimated_part_cnt, ...) \
({ \
rd_kafka_topic_partition_field_t _f[] = { \
__VA_ARGS__, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; \
rd_kafka_buf_read_topic_partitions0(rkbuf, estimated_part_cnt, \
_f); \
})

int rd_kafka_buf_write_topic_partitions0(
rd_kafka_buf_t *rkbuf,
const rd_kafka_topic_partition_list_t *parts,
rd_bool_t skip_invalid_offsets,
rd_bool_t only_invalid_offsets,
rd_bool_t write_Offset,
rd_bool_t write_Epoch,
rd_bool_t write_Metadata);
const rd_kafka_topic_partition_field_t *fields);
#define rd_kafka_buf_write_topic_partitions(rkbuf, parts, skip_invalid, \
only_invalid, ...) \
({ \
rd_kafka_topic_partition_field_t _f[] = { \
__VA_ARGS__, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; \
rd_kafka_buf_write_topic_partitions0( \
rkbuf, parts, skip_invalid, only_invalid, _f); \
})

rd_kafka_resp_err_t
rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
Expand Down
3 changes: 1 addition & 2 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1866,8 +1866,7 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata(
rd_assert(state->prev_assignment != NULL);
rd_kafka_buf_write_topic_partitions(
rkbuf, state->prev_assignment, rd_false /*skip invalid offsets*/,
rd_false /*any offset*/, rd_false /*write offsets*/,
rd_false /*write epoch*/, rd_false /*write metadata*/);
rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
rd_kafka_buf_write_i32(rkbuf, state->generation_id);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
Expand Down
13 changes: 8 additions & 5 deletions src/rdkafka_txnmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1496,8 +1496,9 @@ static void rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t *rk,

rd_kafka_buf_read_throttle_time(rkbuf);

partitions =
rd_kafka_buf_read_topic_partitions(rkbuf, 0, rd_false, rd_true);
partitions = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);
if (!partitions)
goto err_parse;

Expand Down Expand Up @@ -1706,9 +1707,11 @@ rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb,
/* Write per-partition offsets list */
cnt = rd_kafka_buf_write_topic_partitions(
rkbuf, rko->rko_u.txn.offsets, rd_true /*skip invalid offsets*/,
rd_false /*any offset*/, rd_true /*write offsets*/,
ApiVersion >= 2 /*write Epoch (-1) */, rd_true /*write Metadata*/);

rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
ApiVersion >= 2 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH
: RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA);
if (!cnt) {
/* No valid partition offsets, don't commit. */
rd_kafka_buf_destroy(rkbuf);
Expand Down