Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Delegate to leader on metadata refresh (#4163)
Co-authored-by: Milind L <[email protected]>
  • Loading branch information
emasab and milindl authored Jan 27, 2023
commit 73a371a45c68cb9d81c7033b87cbb6977002cc72
1 change: 1 addition & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -6287,6 +6287,7 @@ typedef rd_kafka_resp_err_t(rd_kafka_interceptor_f_on_thread_exit_t)(
* @param secproto The security protocol.
* @param name The original name of the broker.
* @param port The port of the broker.
* @param state Broker state name.
* @param ic_opaque The interceptor's opaque pointer specified in ..add..().
*
* @returns an error code on failure, the error is logged but otherwise ignored.
Expand Down
17 changes: 17 additions & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
va_list ap;
rd_kafka_bufq_t tmpq_waitresp, tmpq;
int old_state;
rd_kafka_toppar_t *rktp;

rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));

Expand Down Expand Up @@ -641,6 +642,22 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs);
}

/* If this broker acts as the preferred (follower) replica for any
* partition, delegate the partition back to the leader. */
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
rd_kafka_toppar_lock(rktp);
if (unlikely(rktp->rktp_broker != rkb)) {
/* Currently migrating away from this
* broker, skip. */
rd_kafka_toppar_unlock(rktp);
continue;
}
rd_kafka_toppar_unlock(rktp);

if (rktp->rktp_leader_id != rktp->rktp_broker_id) {
rd_kafka_toppar_delegate_to_leader(rktp);
}
}

/* Query for topic leaders to quickly pick up on failover. */
if (err != RD_KAFKA_RESP_ERR__DESTROY &&
Expand Down
56 changes: 26 additions & 30 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,6 @@ static void rd_kafka_fetch_reply_handle_partition_error(
* application while some handled by rdkafka */
switch (err) {
/* Errors handled by rdkafka */
case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH:
case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER:
case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
/* Reset preferred replica before refreshing metadata */
if (rktp->rktp_broker_id != rktp->rktp_leader_id)
rd_kafka_toppar_delegate_to_leader(rktp);

/* Request metadata information update and retry */
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH:
rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_CONSUMER, "FETCH",
"Topic %s [%" PRId32
"]: Fetch failed at %s: %s: broker %" PRId32
"has not yet caught up on latest metadata: "
"retrying",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rd_kafka_err2str(err), rktp->rktp_broker_id);
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE:
/* Occurs when:
* - Msg exists on broker but
Expand All @@ -218,8 +191,8 @@ static void rd_kafka_fetch_reply_handle_partition_error(
* (replica is out of sync).
* - partition leader is out of sync.
*
* Handle by requesting metadata update and
* retrying FETCH (with backoff).
* Handle by requesting metadata update, changing back to the
* leader, and then retrying FETCH (with backoff).
*/
rd_rkb_dbg(rkb, MSG, "FETCH",
"Topic %s [%" PRId32
Expand All @@ -229,7 +202,30 @@ static void rd_kafka_fetch_reply_handle_partition_error(
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rktp->rktp_broker_id, rktp->rktp_leader_id);
rd_kafka_topic_fast_leader_query(rkb->rkb_rk);
/* Continue with next case */
case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER:
case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH:
if (rktp->rktp_broker_id != rktp->rktp_leader_id) {
rd_kafka_toppar_delegate_to_leader(rktp);
}
/* Request metadata information update*/
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH:
rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_CONSUMER, "FETCH",
"Topic %s [%" PRId32
"]: Fetch failed at %s: %s: broker %" PRId32
"has not yet caught up on latest metadata: "
"retrying",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rd_kafka_err2str(err), rktp->rktp_broker_id);
break;

case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE: {
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -2215,8 +2215,8 @@ rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
.rkmc_full_topics_sent;

if (topic_cnt == 0 && ApiVersion >= 1 && ApiVersion < 9) {
rd_kafka_buf_update_i32(rkbuf,
of_TopicArrayCnt, -1); /* Null: all topics*/
rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt,
-1); /* Null: all topics*/
} else {
rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt,
topic_cnt);
Expand Down
232 changes: 229 additions & 3 deletions tests/0104-fetch_from_follower_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@
* @name Fetch from follower tests using the mock broker.
*/

static int allowed_error;

/**
* @brief Decide what error_cb's will cause the test to fail.
*/
static int
error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
if (err == allowed_error ||
/* If transport errors are allowed then it is likely
* that we'll also see ALL_BROKERS_DOWN. */
(allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
TEST_SAY("Ignoring allowed error: %s: %s\n",
rd_kafka_err2name(err), reason);
return 0;
}
return 1;
}


/**
* @brief Test offset reset when fetching from replica.
Expand Down Expand Up @@ -248,7 +267,7 @@ static void do_test_replica_not_available(void) {
const char *topic = "test";
const int msgcnt = 1000;

TEST_SAY(_C_MAG "[ Test REPLICA_NOT_AVAIALBLE ]\n");
TEST_SAY(_C_MAG "[ Test REPLICA_NOT_AVAILABLE ]\n");

mcluster = test_mock_cluster_new(3, &bootstraps);

Expand Down Expand Up @@ -283,7 +302,7 @@ static void do_test_replica_not_available(void) {
RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0);


test_consumer_assign_partition("REPLICA_NOT_AVAIALBLE", c, topic, 0,
test_consumer_assign_partition("REPLICA_NOT_AVAILABLE", c, topic, 0,
RD_KAFKA_OFFSET_INVALID);

test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000);
Expand All @@ -300,7 +319,207 @@ static void do_test_replica_not_available(void) {

test_mock_cluster_destroy(mcluster);

TEST_SAY(_C_GRN "[ Test REPLICA_NOT_AVAIALBLE PASSED ]\n");
TEST_SAY(_C_GRN "[ Test REPLICA_NOT_AVAILABLE PASSED ]\n");
}

/**
* @brief With an error \p err on a Fetch request should query for the new
* leader or preferred replica and refresh metadata.
*/
static void do_test_delegate_to_leader_on_error(rd_kafka_resp_err_t err) {
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
rd_kafka_t *c;
const char *topic = "test";
const int msgcnt = 1000;
const char *errstr = rd_kafka_err2name(err);

TEST_SAY(_C_MAG "[ Test %s ]\n", errstr);

mcluster = test_mock_cluster_new(3, &bootstraps);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "10", NULL);

/* Set partition leader to broker 1. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);

test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "client.rack", "myrack");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
test_conf_set(conf, "fetch.error.backoff.ms", "1000");

c = test_create_consumer("mygroup", NULL, conf, NULL);

rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1 /*Broker 1*/, 1 /*FetchRequest*/, 10, err, 0, err, 0,
err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0);


test_consumer_assign_partition(errstr, c, topic, 0,
RD_KAFKA_OFFSET_INVALID);

test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000);

/* Switch leader to broker 2 so that metadata is updated,
* causing the consumer to start fetching from the new leader. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);

test_consumer_poll_timeout("Consume", c, 0, 1, 0, msgcnt, NULL, 2000);

test_consumer_close(c);

rd_kafka_destroy(c);

test_mock_cluster_destroy(mcluster);

TEST_SAY(_C_GRN "[ Test %s ]\n", errstr);
}

/**
* @brief Test when the preferred replica is no longer a follower of the
* partition leader. We should try fetch from the leader instead.
*/
static void do_test_not_leader_or_follower(void) {
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
rd_kafka_t *c;
const char *topic = "test";
const int msgcnt = 10;

TEST_SAY(_C_MAG "[ Test NOT_LEADER_OR_FOLLOWER ]\n");

mcluster = test_mock_cluster_new(3, &bootstraps);
/* Set partition leader to broker 1. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);

test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "client.rack", "myrack");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
test_conf_set(conf, "fetch.error.backoff.ms", "1000");
test_conf_set(conf, "fetch.message.max.bytes", "10");

c = test_create_consumer("mygroup", NULL, conf, NULL);

test_consumer_assign_partition("NOT_LEADER_OR_FOLLOWER", c, topic, 0,
RD_KAFKA_OFFSET_INVALID);

/* Since there are no messages, this poll only waits for metadata, and
* then sets the preferred replica after the first fetch request. */
test_consumer_poll_no_msgs("Initial metadata and preferred replica set",
c, 0, 2000);

/* Change the follower, so that the preferred replica is no longer the
* leader or follower. */
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, -1);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
"bootstrap.servers", bootstraps,
"batch.num.messages", "10", NULL);

/* On getting a NOT_LEADER_OR_FOLLOWER error, we should change to the
* leader and fetch from there without timing out. */
test_msgver_t mv;
test_msgver_init(&mv, 0);
test_consumer_poll_timeout("from leader", c, 0, 1, 0, msgcnt, &mv,
2000);
test_msgver_verify0(
__FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID,
(struct test_mv_vs) {
.msg_base = 0, .exp_cnt = msgcnt, .broker_id = 1});
test_msgver_clear(&mv);

test_consumer_close(c);

rd_kafka_destroy(c);

test_mock_cluster_destroy(mcluster);

TEST_SAY(_C_GRN "[ Test NOT_LEADER_OR_FOLLOWER PASSED ]\n");
}


/**
* @brief Test when the preferred replica broker goes down. When a broker is
* going down, we should delegate all its partitions to their leaders.
*/
static void do_test_follower_down(void) {
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
rd_kafka_conf_t *conf;
rd_kafka_t *c;
const char *topic = "test";
const int msgcnt = 10;

TEST_SAY(_C_MAG "[ Test with follower down ]\n");

mcluster = test_mock_cluster_new(3, &bootstraps);
/* Set partition leader to broker 1. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);

test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "client.rack", "myrack");
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
test_conf_set(conf, "fetch.error.backoff.ms", "1000");
test_conf_set(conf, "fetch.message.max.bytes", "10");

c = test_create_consumer("mygroup", NULL, conf, NULL);

test_consumer_assign_partition("follower down", c, topic, 0,
RD_KAFKA_OFFSET_INVALID);

/* Since there are no messages, this poll only waits for metadata, and
* then sets the preferred replica after the first fetch request. */
test_consumer_poll_no_msgs("Initial metadata and preferred replica set",
c, 0, 2000);


/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
"bootstrap.servers", bootstraps,
"batch.num.messages", "10", NULL);

/* Set follower down. When follower is set as DOWN, we also expect
* that the cluster itself knows and does not ask us to change our
* preferred replica to the broker which is down. To facilitate this,
* we just set the follower to 3 instead of 2. */
allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
test_curr->is_fatal_cb = error_is_fatal_cb;
rd_kafka_mock_broker_set_down(mcluster, 2);
rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3);

/* Wee should change to the new follower when the old one goes down,
* and fetch from there without timing out. */
test_msgver_t mv;
test_msgver_init(&mv, 0);
test_consumer_poll_timeout("from other follower", c, 0, 1, 0, msgcnt,
&mv, 2000);
test_msgver_verify0(
__FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID,
(struct test_mv_vs) {
.msg_base = 0, .exp_cnt = msgcnt, .broker_id = 3});
test_msgver_clear(&mv);

test_consumer_close(c);

rd_kafka_destroy(c);

test_mock_cluster_destroy(mcluster);

TEST_SAY(_C_GRN "[ Test with follower down PASSED ]\n");
}


Expand All @@ -320,5 +539,12 @@ int main_0104_fetch_from_follower_mock(int argc, char **argv) {

do_test_replica_not_available();

do_test_delegate_to_leader_on_error(
RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE);

do_test_not_leader_or_follower();

do_test_follower_down();

return 0;
}
Loading