Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b75c4d3
Add partition leader_epoch and change rktpar _private to point to glu…
edenhill Nov 30, 2022
3ac9783
Update Kafka protocol enums
edenhill Dec 1, 2022
94292e8
Leader epoch support, WIP
edenhill Dec 1, 2022
cb7e6e2
Test 0018: improve timeout robustness
edenhill Dec 6, 2022
b2e0652
Replace consumer offsets with fetch_pos (offset, epoch)
edenhill Dec 21, 2022
34316a8
rd_kafka_buf_(write|read)_topic_partition_list(): variable field orde…
edenhill Dec 21, 2022
b80356d
Added rd_kafka_buf_write_arraycnt()
edenhill Dec 21, 2022
f130f66
Additional partition leader epoch handling
edenhill Dec 21, 2022
0b0e94c
Test updates following stricter epoch handling
edenhill Dec 21, 2022
bb8c5ce
WIP: Reset preferred replica after refresh
milindl Jan 2, 2023
b78261f
Merge branch 'master' into kip320
emasab Jan 25, 2023
f3b384c
Add has reliable leader epoch check,
emasab Jan 26, 2023
8415d50
Fix in mock handler: TopicAuthorizedOperations
emasab Jan 26, 2023
73a371a
Delegate to leader on metadata refresh (#4163)
emasab Jan 27, 2023
6358405
Merge branch 'master' into feature/kip320
emasab Jan 27, 2023
80270ad
Fix upgrade Metadata to version 9
emasab Jan 27, 2023
9e787bb
Merge branch 'master' into feature/kip320
emasab Feb 10, 2023
4ac9b0d
MSVC compatibility about statement
emasab Feb 10, 2023
3dfa1f3
Differentiate current epoch from offset epoch
emasab Feb 20, 2023
200c4d6
Has reliable leader epochs function
emasab Feb 28, 2023
4658abd
Fix cached metadata without epoch
emasab Feb 28, 2023
680cf2e
Update fetch pos leader epoch with the
emasab Feb 28, 2023
1267ed6
Allow calling reset with log truncation error,
emasab Feb 28, 2023
6945ca9
Seek with wait was causing a deadlock,
emasab Feb 28, 2023
7b5a1a4
Remove write tags as it's written in
emasab Feb 28, 2023
d852323
Update next fetch position before validating
emasab Mar 1, 2023
2f02560
Replace current leader epoch after
emasab Mar 1, 2023
ade86e6
Force leader query parameter
emasab Mar 1, 2023
2de4371
Check error in requested partition too
emasab Mar 1, 2023
b3e388b
Fix error action and allow refresh to happen
emasab Mar 1, 2023
de379d1
Change action for unknown leader epoch:
emasab Mar 1, 2023
c70fc0d
Remove next leader epoch
emasab Mar 2, 2023
ef3b7fd
Fix there's no leader epoch associated to
emasab Mar 3, 2023
a77c2ee
rd_kafka_topic_partition_list_update copies epoch
emasab Mar 3, 2023
b96ee42
Check offsets and epoch in test 0103,
emasab Mar 6, 2023
0300b2e
Add leader epoch to consumer example
emasab Mar 7, 2023
5e50954
Validate seek requests
emasab Mar 7, 2023
b13bade
Merge branch 'master' into feature/kip320
emasab Mar 10, 2023
4f304d7
Don't export get and set current leader epoch
emasab Mar 19, 2023
88fc9be
Fetch mock handler mimics broker behavior
emasab Mar 19, 2023
a5e91a7
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
15ace01
Adapt batch consumer fix to KIP-320
emasab Mar 22, 2023
19195a7
Set app position before validating
emasab Mar 20, 2023
8a5c46a
librdkafka version v2.1.0
emasab Mar 22, 2023
2d8cfc3
Merge branch 'master' into feature/kip320
emasab Mar 22, 2023
8a324bf
Better message in the case no offset
emasab Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# librdkafka v2.0.3
# librdkafka v2.1.0

librdkafka v2.0.3 is a bugfix release:
librdkafka v2.1.0 is a feature release:

* [KIP-320](https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
Allow fetchers to detect and handle log truncation (#4122).
* Fix a reference count issue blocking the consumer from closing (#4187).
* Fix a protocol issue with ListGroups API, where an extra
field was appended for API Versions greater than or equal to 3.
field was appended for API Versions greater than or equal to 3 (#4207).
* Fix an issue with `max.poll.interval.ms`, where polling any queue would cause
the timeout to be reset (#4176).
* Fix seek partition timeout, was one thousand times lower than the passed
Expand All @@ -15,6 +17,18 @@ librdkafka v2.0.3 is a bugfix release:
* Upgrade OpenSSL to v3.0.8 with various security fixes,
check the [release notes](https://www.openssl.org/news/cl30.txt) (#4215).

## Enhancements

* Added `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`).
* Added partition leader epoch APIs:
- `rd_kafka_topic_partition_get_leader_epoch()` (and `set..()`)
- `rd_kafka_message_leader_epoch()`
- `rd_kafka_*assign()` and `rd_kafka_seek_partitions()` now supports
partitions with a leader epoch set.
- `rd_kafka_offsets_for_times()` will return per-partition leader-epochs.
- `leader_epoch`, `stored_leader_epoch`, and `committed_leader_epoch`
added to per-partition statistics.


## Fixes

Expand Down
4 changes: 2 additions & 2 deletions INTRODUCTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1887,7 +1887,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf
| KIP-289 - Consumer group.id default to NULL | 2.2.0 | Supported |
| KIP-294 - SSL endpoint verification | 2.0.0 | Supported |
| KIP-302 - Use all addresses for resolved broker hostname | 2.1.0 | Supported |
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Not supported |
| KIP-320 - Consumer: handle log truncation | 2.1.0, 2.2.0 | Supported |
| KIP-322 - DeleteTopics disabled error code | 2.1.0 | Supported |
| KIP-339 - AdminAPI: incrementalAlterConfigs | 2.3.0 | Not supported |
| KIP-341 - Update Sticky partition assignment data | 2.3.0 | Not supported (superceeded by KIP-429) |
Expand Down Expand Up @@ -1953,7 +1953,7 @@ release of librdkafka.
| 0 | Produce | 9 | 7 |
| 1 | Fetch | 13 | 11 |
| 2 | ListOffsets | 7 | 2 |
| 3 | Metadata | 12 | 4 |
| 3 | Metadata | 12 | 9 |
| 8 | OffsetCommit | 8 | 7 |
| 9 | OffsetFetch | 8 | 7 |
| 10 | FindCoordinator | 4 | 2 |
Expand Down
3 changes: 3 additions & 0 deletions STATISTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,16 @@ query_offset | int gauge | | Current/Last logical offset query
next_offset | int gauge | | Next offset to fetch
app_offset | int gauge | | Offset of last message passed to application + 1
stored_offset | int gauge | | Offset to be committed
stored_leader_epoch | int | | Partition leader epoch of stored offset
committed_offset | int gauge | | Last committed offset
committed_leader_epoch | int | | Partition leader epoch of committed offset
eof_offset | int gauge | | Last PARTITION_EOF signaled offset
lo_offset | int gauge | | Partition's low watermark offset on broker
hi_offset | int gauge | | Partition's high watermark offset on broker
ls_offset | int gauge | | Partition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0.
consumer_lag | int gauge | | Difference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset.
consumer_lag_stored | int gauge | | Difference between (hi_offset or ls_offset) and stored_offset. See consumer_lag and stored_offset.
leader_epoch | int | | Last known partition leader epoch, or -1 if unknown.
txmsgs | int | | Total number of messages transmitted (produced)
txbytes | int | | Total number of bytes transmitted for txmsgs
rxmsgs | int | | Total number of messages consumed, not including ignored messages (due to offset, etc).
Expand Down
2 changes: 1 addition & 1 deletion configure.self
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mkl_toggle_option "Development" ENABLE_VALGRIND "--enable-valgrind" "Enable in-c

mkl_toggle_option "Development" ENABLE_REFCNT_DEBUG "--enable-refcnt-debug" "Enable refcnt debugging" "n"

mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4-ext" "Enable external LZ4 library support (builtin version 1.9.2)" "y"
mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4-ext" "Enable external LZ4 library support (builtin version 1.9.3)" "y"
mkl_toggle_option "Feature" ENABLE_LZ4_EXT "--enable-lz4" "Deprecated: alias for --enable-lz4-ext" "y"

mkl_toggle_option "Feature" ENABLE_REGEX_EXT "--enable-regex-ext" "Enable external (libc) regex (else use builtin)" "y"
Expand Down
5 changes: 3 additions & 2 deletions examples/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ int main(int argc, char **argv) {
}

/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
printf("Message on %s [%" PRId32 "] at offset %" PRId64
" (leader epoch %" PRId32 "):\n",
rd_kafka_topic_name(rkm->rkt), rkm->partition,
rkm->offset);
rkm->offset, rd_kafka_message_leader_epoch(rkm));

/* Print the message key. */
if (rkm->key && is_printable(rkm->key, rkm->key_len))
Expand Down
7 changes: 5 additions & 2 deletions src-cpp/HandleImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ rd_kafka_topic_partition_list_t *partitions_to_c_parts(
rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_list_add(
c_parts, tpi->topic_.c_str(), tpi->partition_);
rktpar->offset = tpi->offset_;
if (tpi->leader_epoch_ != -1)
rd_kafka_topic_partition_set_leader_epoch(rktpar, tpi->leader_epoch_);
}

return c_parts;
Expand All @@ -412,8 +414,9 @@ void update_partitions_from_c_parts(
dynamic_cast<RdKafka::TopicPartitionImpl *>(partitions[j]);
if (!strcmp(p->topic, pp->topic_.c_str()) &&
p->partition == pp->partition_) {
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->offset_ = p->offset;
pp->err_ = static_cast<RdKafka::ErrorCode>(p->err);
pp->leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(p);
}
}
}
Expand Down
43 changes: 42 additions & 1 deletion src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace RdKafka {
* @remark This value should only be used during compile time,
* for runtime checks of version use RdKafka::version()
*/
#define RD_KAFKA_VERSION 0x020002ff
#define RD_KAFKA_VERSION 0x020100ff

/**
* @brief Returns the librdkafka version as integer.
Expand Down Expand Up @@ -324,6 +324,8 @@ enum ErrorCode {
ERR__NOOP = -141,
/** No offset to automatically reset to */
ERR__AUTO_OFFSET_RESET = -140,
/** Partition log truncation detected */
ERR__LOG_TRUNCATION = -139,

/** End internal error codes */
ERR__END = -100,
Expand Down Expand Up @@ -1978,6 +1980,12 @@ class RD_EXPORT TopicPartition {

/** @returns error code (if applicable) */
virtual ErrorCode err() const = 0;

/** @brief Get partition leader epoch, or -1 if not known or relevant. */
virtual int32_t get_leader_epoch() = 0;

/** @brief Set partition leader epoch. */
virtual void set_leader_epoch(int32_t leader_epoch) = 0;
};


Expand Down Expand Up @@ -2035,6 +2043,11 @@ class RD_EXPORT Topic {
* The offset will be committed (written) to the broker (or file) according
* to \p auto.commit.interval.ms or next manual offset-less commit call.
*
* @deprecated This API lacks support for partition leader epochs, which makes
* it at risk for unclean leader election log truncation issues.
* Use KafkaConsumer::offsets_store() or
* Message::offset_store() instead.
*
* @remark \c enable.auto.offset.store must be set to \c false when using
* this API.
*
Expand Down Expand Up @@ -2465,6 +2478,31 @@ class RD_EXPORT Message {
/** @returns the broker id of the broker the message was produced to or
* fetched from, or -1 if not known/applicable. */
virtual int32_t broker_id() const = 0;

/** @returns the message's partition leader epoch at the time the message was
* fetched and if known, else -1. */
virtual int32_t leader_epoch() const = 0;

/**
* @brief Store offset +1 for the consumed message.
*
* The message offset + 1 will be committed to broker according
* to \c `auto.commit.interval.ms` or manual offset-less commit()
*
* @warning This method may only be called for partitions that are currently
* assigned.
* Non-assigned partitions will fail with ERR__STATE.
*
* @warning Avoid storing offsets after calling seek() (et.al) as
* this may later interfere with resuming a paused partition, instead
* store offsets prior to calling seek.
*
* @remark \c `enable.auto.offset.store` must be set to "false" when using
* this API.
*
* @returns NULL on success or an error object on failure.
*/
virtual Error *offset_store() = 0;
};

/**@}*/
Expand Down Expand Up @@ -2865,6 +2903,9 @@ class RD_EXPORT KafkaConsumer : public virtual Handle {
* @remark \c enable.auto.offset.store must be set to \c false when using
* this API.
*
* @remark The leader epoch, if set, will be used to fence outdated partition
* leaders. See TopicPartition::set_leader_epoch().
*
* @returns RdKafka::ERR_NO_ERROR on success, or
* RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
* be stored, or
Expand Down
39 changes: 33 additions & 6 deletions src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,21 @@ class MessageImpl : public Message {
return rd_kafka_message_broker_id(rkmessage_);
}

int32_t leader_epoch() const {
return rd_kafka_message_leader_epoch(rkmessage_);
}


Error *offset_store() {
rd_kafka_error_t *c_error;

c_error = rd_kafka_offset_store_message(rkmessage_);

if (c_error)
return new ErrorImpl(c_error);
else
return NULL;
}

RdKafka::Topic *topic_;
rd_kafka_message_t *rkmessage_;
Expand Down Expand Up @@ -1227,21 +1242,24 @@ class TopicPartitionImpl : public TopicPartition {
topic_(topic),
partition_(partition),
offset_(RdKafka::Topic::OFFSET_INVALID),
err_(ERR_NO_ERROR) {
err_(ERR_NO_ERROR),
leader_epoch_(-1) {
}

TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) :
topic_(topic),
partition_(partition),
offset_(offset),
err_(ERR_NO_ERROR) {
err_(ERR_NO_ERROR),
leader_epoch_(-1) {
}

TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) {
topic_ = std::string(c_part->topic);
partition_ = c_part->partition;
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
topic_ = std::string(c_part->topic);
partition_ = c_part->partition;
offset_ = c_part->offset;
err_ = static_cast<ErrorCode>(c_part->err);
leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part);
// FIXME: metadata
}

Expand All @@ -1266,6 +1284,14 @@ class TopicPartitionImpl : public TopicPartition {
offset_ = offset;
}

int32_t get_leader_epoch() {
return leader_epoch_;
}

void set_leader_epoch(int32_t leader_epoch) {
leader_epoch_ = leader_epoch_;
}

std::ostream &operator<<(std::ostream &ostrm) const {
return ostrm << topic_ << " [" << partition_ << "]";
}
Expand All @@ -1274,6 +1300,7 @@ class TopicPartitionImpl : public TopicPartition {
int partition_;
int64_t offset_;
ErrorCode err_;
int32_t leader_epoch_;
};


Expand Down
Loading