Skip to content
Merged
10 changes: 10 additions & 0 deletions packaging/tools/style-format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ if [[ -z $cpp_style ]]; then
fi

extra_info=""
clang_format_version=10
clang_version_warning=0

for f in $*; do

Expand All @@ -81,6 +83,14 @@ for f in $*; do
stylename="C"
fi

if [[ $lang == c && $clang_version_warning == 0 ]]; then
# Check if clang-format is at the correct version, and if not, warn.
if ! clang-format --version | grep -o "version ${clang_format_version}\.[[:digit:]]\+\.[[:digit:]]\+" 1> /dev/null; then
echo "Recommended clang-format version should be ${clang_format_version}.x.y, is '$(clang-format --version)'" 1>&2
clang_version_warning=1
fi
fi

check=0

if [[ $fix == 1 ]]; then
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -6287,6 +6287,7 @@ typedef rd_kafka_resp_err_t(rd_kafka_interceptor_f_on_thread_exit_t)(
* @param secproto The security protocol.
* @param name The original name of the broker.
* @param port The port of the broker.
* @param state Broker state name.
* @param ic_opaque The interceptor's opaque pointer specified in ..add..().
*
* @returns an error code on failure, the error is logged but otherwise ignored.
Expand Down
17 changes: 17 additions & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
va_list ap;
rd_kafka_bufq_t tmpq_waitresp, tmpq;
int old_state;
rd_kafka_toppar_t *rktp;

rd_kafka_assert(rkb->rkb_rk, thrd_is_current(rkb->rkb_thread));

Expand Down Expand Up @@ -641,6 +642,22 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
rd_kafka_bufq_dump(rkb, "BRKOUTBUFS", &rkb->rkb_outbufs);
}

/* If this broker acts as the preferred (follower) replica for any
* partition, delegate the partition back to the leader. */
TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
rd_kafka_toppar_lock(rktp);
if (unlikely(rktp->rktp_broker != rkb)) {
/* Currently migrating away from this
* broker, skip. */
rd_kafka_toppar_unlock(rktp);
continue;
}
rd_kafka_toppar_unlock(rktp);

if (rktp->rktp_leader_id != rktp->rktp_broker_id) {
rd_kafka_toppar_delegate_to_leader(rktp);
}
}

/* Query for topic leaders to quickly pick up on failover. */
if (err != RD_KAFKA_RESP_ERR__DESTROY &&
Expand Down
56 changes: 26 additions & 30 deletions src/rdkafka_fetcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,33 +182,6 @@ static void rd_kafka_fetch_reply_handle_partition_error(
* application while some handled by rdkafka */
switch (err) {
/* Errors handled by rdkafka */
case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH:
case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER:
case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
/* Reset preferred replica before refreshing metadata */
if (rktp->rktp_broker_id != rktp->rktp_leader_id)
rd_kafka_toppar_delegate_to_leader(rktp);

/* Request metadata information update and retry */
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH:
rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_CONSUMER, "FETCH",
"Topic %s [%" PRId32
"]: Fetch failed at %s: %s: broker %" PRId32
"has not yet caught up on latest metadata: "
"retrying",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rd_kafka_err2str(err), rktp->rktp_broker_id);
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE:
/* Occurs when:
* - Msg exists on broker but
Expand All @@ -218,8 +191,8 @@ static void rd_kafka_fetch_reply_handle_partition_error(
* (replica is out of sync).
* - partition leader is out of sync.
*
* Handle by requesting metadata update and
* retrying FETCH (with backoff).
* Handle by requesting metadata update, changing back to the
* leader, and then retrying FETCH (with backoff).
*/
rd_rkb_dbg(rkb, MSG, "FETCH",
"Topic %s [%" PRId32
Expand All @@ -229,7 +202,30 @@ static void rd_kafka_fetch_reply_handle_partition_error(
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rktp->rktp_broker_id, rktp->rktp_leader_id);
rd_kafka_topic_fast_leader_query(rkb->rkb_rk);
/* Continue with next case */
case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART:
case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER:
case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH:
if (rktp->rktp_broker_id != rktp->rktp_leader_id) {
rd_kafka_toppar_delegate_to_leader(rktp);
}
/* Request metadata information update*/
rd_kafka_toppar_leader_unavailable(rktp, "fetch", err);
break;

case RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH:
rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_CONSUMER, "FETCH",
"Topic %s [%" PRId32
"]: Fetch failed at %s: %s: broker %" PRId32
"has not yet caught up on latest metadata: "
"retrying",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos),
rd_kafka_err2str(err), rktp->rktp_broker_id);
break;

case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE: {
Expand Down
Loading