Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
MSVC compatibility about statement
expressions
  • Loading branch information
emasab committed Feb 27, 2023
commit 4ac9b0d1c91dbb0a06b3f5e9afb1a2965eebd714
1 change: 1 addition & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef SSIZE_T ssize_t;
#define RD_DEPRECATED __attribute__((deprecated))

#if defined(__clang__) || defined(__GNUC__) || defined(__GNUG__)
#define RD_HAS_STATEMENT_EXPRESSIONS
#define RD_FORMAT(...) __attribute__((format(__VA_ARGS__)))
#else
#define RD_FORMAT(...)
Expand Down
23 changes: 16 additions & 7 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3576,10 +3576,13 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_throttle_time(reply);

offsets = rd_kafka_buf_read_topic_partitions(
reply, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,

const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
offsets = rd_kafka_buf_read_topic_partitions(reply, 0, fields);
if (!offsets)
rd_kafka_buf_parse_fail(reply,
"Failed to parse topic partitions");
Expand Down Expand Up @@ -4159,9 +4162,12 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,

rd_kafka_buf_read_throttle_time(reply);

partitions = rd_kafka_buf_read_topic_partitions(
reply, 16, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);

const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
partitions = rd_kafka_buf_read_topic_partitions(reply, 16, fields);
if (!partitions) {
rd_snprintf(errstr, errstr_size,
"Failed to parse OffsetDeleteResponse partitions");
Expand Down Expand Up @@ -6445,8 +6451,11 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
/* Decreased in rd_kafka_buf_destroy */
rd_kafka_broker_keep(rkb);
rd_kafka_buf_read_i16(rkbuf, &version);
const rd_kafka_topic_partition_field_t fields[] =
{RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
partitions = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, rd_false, rd_false);
rkbuf, 0, fields);
rd_kafka_buf_destroy(rkbuf);
if (!partitions)
rd_kafka_buf_parse_fail(
Expand Down
9 changes: 6 additions & 3 deletions src/rdkafka_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,15 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
/* If there are no owned partitions, this is specified as an
* empty array, not NULL. */
rd_kafka_buf_write_i32(rkbuf, 0); /* Topic count */
else
else {
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(
rkbuf, owned_partitions,
rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
rd_false /*any offset*/, fields);
}

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
Expand Down
14 changes: 10 additions & 4 deletions src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1505,8 +1505,11 @@ static void rd_kafka_cgrp_handle_SyncGroup_memberstate(
rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);

rd_kafka_buf_read_i16(rkbuf, &Version);
if (!(assignment = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION)))
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
if (!(assignment =
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
goto err_parse;
rd_kafka_buf_read_bytes(rkbuf, &UserData);

Expand Down Expand Up @@ -1799,9 +1802,12 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
rd_kafka_buf_read_bytes(rkbuf, &UserData);
rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);

const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
if (Version >= 1 &&
!(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION)))
!(rkgm->rkgm_owned =
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
goto err;

rd_kafka_buf_destroy(rkbuf);
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ rd_kafka_fetch_pos_make(int64_t offset, int32_t leader_epoch) {
return fetchpos;
}

#ifdef RD_HAS_STATEMENT_EXPRESSIONS
#define RD_KAFKA_FETCH_POS(offset, leader_epoch) \
({ \
rd_kafka_fetch_pos_t _fetchpos = {offset, leader_epoch}; \
_fetchpos; \
})
#else
#define RD_KAFKA_FETCH_POS(offset, leader_epoch) \
rd_kafka_fetch_pos_make(offset, leader_epoch)
#endif



Expand Down
45 changes: 30 additions & 15 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ int rd_kafka_err_action(rd_kafka_broker_t *rkb,
*
* @returns a newly allocated list on success, or NULL on parse error.
*/
rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions0(
rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
const rd_kafka_topic_partition_field_t *fields) {
Expand Down Expand Up @@ -305,7 +305,7 @@ rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions0(
*
* @remark The \p parts list MUST be sorted.
*/
int rd_kafka_buf_write_topic_partitions0(
int rd_kafka_buf_write_topic_partitions(
rd_kafka_buf_t *rkbuf,
const rd_kafka_topic_partition_list_t *parts,
rd_bool_t skip_invalid_offsets,
Expand Down Expand Up @@ -772,12 +772,14 @@ rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(
if (ApiVersion >= 2)
rd_kafka_buf_read_throttle_time(rkbuf);

*offsets = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
ApiVersion >= 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH
: RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET);
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
*offsets = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields);
if (!*offsets)
goto err_parse;

Expand Down Expand Up @@ -822,14 +824,16 @@ void rd_kafka_OffsetForLeaderEpochRequest(
rd_kafka_topic_partition_list_sort_by_topic(parts);

/* Write partition list */
rd_kafka_buf_write_topic_partitions(
rkbuf, parts, rd_false /*include invalid offsets*/,
rd_false /*skip valid offsets */,
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
/* CurrentLeaderEpoch */
RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
/* LeaderEpoch */
RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH);
RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(
rkbuf, parts, rd_false /*include invalid offsets*/,
rd_false /*skip valid offsets */, fields);

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

Expand Down Expand Up @@ -1152,10 +1156,12 @@ void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb,

/* Write partition list, filtering out partitions with valid
* offsets */
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
PartCnt = rd_kafka_buf_write_topic_partitions(
rkbuf, parts, rd_false /*include invalid offsets*/,
rd_false /*skip valid offsets */,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
rd_false /*skip valid offsets */, fields);
} else {
rd_kafka_buf_write_arraycnt_pos(rkbuf);
}
Expand Down Expand Up @@ -1590,10 +1596,13 @@ rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb,
/* GroupId */
rd_kafka_buf_write_str(rkbuf, grpoffsets->group, -1);

const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(
rkbuf, grpoffsets->partitions,
rd_false /*dont skip invalid offsets*/, rd_false /*any offset*/,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
fields);

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

Expand All @@ -1617,10 +1626,13 @@ rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf,
rkbuf = rd_kafka_buf_new(1, 100);
rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
rd_assert(rkgm->rkgm_assignment);
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(
rkbuf, rkgm->rkgm_assignment,
rd_false /*don't skip invalid offsets*/, rd_false /* any offset */,
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
fields);
rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);

/* Get pointer to binary buffer */
Expand Down Expand Up @@ -4006,10 +4018,13 @@ rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb,
rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteRecords, 1,
4 + (partitions->cnt * 100) + 4);

const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(
rkbuf, partitions, rd_false /*don't skip invalid offsets*/,
rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET);
rd_false /*any offset*/, fields);

/* timeout */
op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
Expand Down
20 changes: 2 additions & 18 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,17 @@ typedef enum {
RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
} rd_kafka_topic_partition_field_t;

rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions0(
rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
rd_kafka_buf_t *rkbuf,
size_t estimated_part_cnt,
const rd_kafka_topic_partition_field_t *fields);

#define rd_kafka_buf_read_topic_partitions(rkbuf, estimated_part_cnt, ...) \
({ \
rd_kafka_topic_partition_field_t _f[] = { \
__VA_ARGS__, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; \
rd_kafka_buf_read_topic_partitions0(rkbuf, estimated_part_cnt, \
_f); \
})

int rd_kafka_buf_write_topic_partitions0(
int rd_kafka_buf_write_topic_partitions(
rd_kafka_buf_t *rkbuf,
const rd_kafka_topic_partition_list_t *parts,
rd_bool_t skip_invalid_offsets,
rd_bool_t only_invalid_offsets,
const rd_kafka_topic_partition_field_t *fields);
#define rd_kafka_buf_write_topic_partitions(rkbuf, parts, skip_invalid, \
only_invalid, ...) \
({ \
rd_kafka_topic_partition_field_t _f[] = { \
__VA_ARGS__, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; \
rd_kafka_buf_write_topic_partitions0( \
rkbuf, parts, skip_invalid, only_invalid, _f); \
})

rd_kafka_resp_err_t
rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
Expand Down
9 changes: 6 additions & 3 deletions src/rdkafka_sticky_assignor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1864,9 +1864,12 @@ static rd_kafkap_bytes_t *rd_kafka_sticky_assignor_get_metadata(

rkbuf = rd_kafka_buf_new(1, 100);
rd_assert(state->prev_assignment != NULL);
rd_kafka_buf_write_topic_partitions(
rkbuf, state->prev_assignment, rd_false /*skip invalid offsets*/,
rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION);
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
rd_kafka_buf_write_topic_partitions(rkbuf, state->prev_assignment,
rd_false /*skip invalid offsets*/,
rd_false /*any offset*/, fields);
rd_kafka_buf_write_i32(rkbuf, state->generation_id);

/* Get binary buffer and allocate a new Kafka Bytes with a copy. */
Expand Down
19 changes: 12 additions & 7 deletions src/rdkafka_txnmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -1496,9 +1496,11 @@ static void rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t *rk,

rd_kafka_buf_read_throttle_time(rkbuf);

partitions = rd_kafka_buf_read_topic_partitions(
rkbuf, 0, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR);
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
partitions = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields);
if (!partitions)
goto err_parse;

Expand Down Expand Up @@ -1705,13 +1707,16 @@ rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb,
}

/* Write per-partition offsets list */
cnt = rd_kafka_buf_write_topic_partitions(
rkbuf, rko->rko_u.txn.offsets, rd_true /*skip invalid offsets*/,
rd_false /*any offset*/, RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
const rd_kafka_topic_partition_field_t fields[] = {
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
ApiVersion >= 2 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH
: RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA);
RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA,
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
cnt = rd_kafka_buf_write_topic_partitions(
rkbuf, rko->rko_u.txn.offsets, rd_true /*skip invalid offsets*/,
rd_false /*any offset*/, fields);
if (!cnt) {
/* No valid partition offsets, don't commit. */
rd_kafka_buf_destroy(rkbuf);
Expand Down