Skip to content

Commit 24712ec

Browse files
piotrsmolinskiAndersBroman
authored andcommitted
Kafka: fix the FETCH response alignment issue
There was a problem in FETCH response parsing when the server had more data than the requested maximal return size. In such case the server checks if the first chunk of data fits into buffer. If it does not, the first chunk is returned as a whole to the requestor. Otherwise it is assumed that the client is capable of discarding invalid content and the server pushes maximum available block. It makes sense, because the default block is 10MB and pushing it opaque leverages zero-copy IO from the file system to the network. In the existing implementation it was assumed that the last batch is aligned with the end of the buffer. Actually, if there is some data more, the last part is delivered truncated. This patch: * fixes the last part alignment handling * adds opaque field for truncated content * moves preferred replica field to the proper context Bug: 16623 Change-Id: Iee6d513ce6711091e5561646a3fd563501eabdda Reviewed-on: https://code.wireshark.org/review/37446 Petri-Dish: Alexis La Goutte <[email protected]> Tested-by: Petri Dish Buildbot Reviewed-by: Anders Broman <[email protected]>
1 parent c97076b commit 24712ec

File tree

1 file changed

+35
-13
lines changed

1 file changed

+35
-13
lines changed

epan/dissectors/packet-kafka.c

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ static int hf_kafka_batch_size = -1;
8585
static int hf_kafka_message_key = -1;
8686
static int hf_kafka_message_value = -1;
8787
static int hf_kafka_message_compression_reduction = -1;
88+
static int hf_kafka_truncated_content = -1;
8889
static int hf_kafka_request_frame = -1;
8990
static int hf_kafka_response_frame = -1;
9091
static int hf_kafka_consumer_group = -1;
@@ -1572,11 +1573,12 @@ decompress(tvbuff_t *tvb, packet_info *pinfo, int offset, guint32 length, int co
15721573
* tree: protocol information tree to append the item
15731574
* hf_item: protocol information item descriptor index
15741575
* offset: pointer to the message
1576+
* end_offset: last possible offset in this batch
15751577
*
15761578
* returns: pointer to the next message/batch
15771579
*/
15781580
static int
1579-
dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
1581+
dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_)
15801582
{
15811583
proto_item *message_ti;
15821584
proto_tree *subtree;
@@ -1657,11 +1659,12 @@ dissect_kafka_message_old(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
16571659
* tree: protocol information tree to append the item
16581660
* hf_item: protocol information item descriptor index
16591661
* offset: pointer to the message
1662+
* end_offset: last possible offset in this batch
16601663
*
16611664
* returns: pointer to the next message/batch
16621665
*/
16631666
static int
1664-
dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
1667+
dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset _U_)
16651668
{
16661669
proto_item *batch_ti;
16671670
proto_tree *subtree;
@@ -1749,15 +1752,30 @@ dissect_kafka_message_new(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
17491752
}
17501753

17511754
static int
1752-
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset)
1755+
dissect_kafka_message(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, int offset, int end_offset)
17531756
{
17541757
gint8 magic_byte;
1758+
guint32 message_size;
17551759

1756-
magic_byte = tvb_get_guint8(tvb, offset+16);
1760+
if (offset + 12 > end_offset) {
1761+
// in this case we deal with truncated message, where the size part may be also truncated
1762+
// actually we may add truncated info
1763+
proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA);
1764+
return end_offset;
1765+
}
1766+
message_size = tvb_get_guint32(tvb, offset + 8, ENC_BIG_ENDIAN);
1767+
if (offset + 12 + message_size > (guint32)end_offset) {
1768+
// in this case we deal with truncated message, where the truncation point falls somewhere
1769+
// in the message body
1770+
proto_tree_add_item(tree, hf_kafka_truncated_content, tvb, offset, end_offset-offset, ENC_NA);
1771+
return end_offset;
1772+
}
1773+
1774+
magic_byte = tvb_get_guint8(tvb, offset + 16);
17571775
if (magic_byte < 2) {
1758-
return dissect_kafka_message_old(tvb, pinfo, tree, offset);
1776+
return dissect_kafka_message_old(tvb, pinfo, tree, offset, end_offset);
17591777
} else {
1760-
return dissect_kafka_message_new(tvb, pinfo, tree, offset);
1778+
return dissect_kafka_message_new(tvb, pinfo, tree, offset, end_offset);
17611779
}
17621780
}
17631781

@@ -1776,15 +1794,14 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, g
17761794
}
17771795

17781796
while (offset < end_offset) {
1779-
offset = dissect_kafka_message(tvb, pinfo, subtree, offset);
1797+
offset = dissect_kafka_message(tvb, pinfo, subtree, offset, end_offset);
17801798
messages += 1;
17811799
}
17821800

17831801
if (offset != end_offset) {
17841802
expert_add_info(pinfo, ti, &ei_kafka_bad_message_set_length);
17851803
}
17861804

1787-
proto_item_append_text(ti, " (%d Messages)", messages);
17881805
proto_item_set_end(ti, tvb, offset);
17891806

17901807
return offset;
@@ -2873,6 +2890,11 @@ dissect_kafka_fetch_response_partition(tvbuff_t *tvb, packet_info *pinfo, proto_
28732890
offset = dissect_kafka_aborted_transactions(tvb, pinfo, subtree, offset, api_version);
28742891
}
28752892

2893+
if (api_version >= 11) {
2894+
proto_tree_add_item(subtree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
2895+
offset += 4;
2896+
}
2897+
28762898
len = tvb_get_ntohl(tvb, offset);
28772899
offset += 4;
28782900

@@ -2925,11 +2947,6 @@ dissect_kafka_fetch_response(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree
29252947
offset += 4;
29262948
}
29272949

2928-
if (api_version >= 11) {
2929-
proto_tree_add_item(tree, hf_kafka_replica, tvb, offset, 4, ENC_BIG_ENDIAN);
2930-
offset += 4;
2931-
}
2932-
29332950
return dissect_kafka_array(tree, tvb, pinfo, offset, api_version, &dissect_kafka_fetch_response_topic);
29342951
}
29352952

@@ -8425,6 +8442,11 @@ proto_register_kafka(void)
84258442
FT_FLOAT, BASE_NONE, 0, 0,
84268443
NULL, HFILL }
84278444
},
8445+
{ &hf_kafka_truncated_content,
8446+
{ "Truncated Content", "kafka.truncated_content",
8447+
FT_BYTES, BASE_NONE, 0, 0,
8448+
NULL, HFILL }
8449+
},
84288450
{ &hf_kafka_consumer_group,
84298451
{ "Consumer Group", "kafka.consumer_group",
84308452
FT_STRING, STR_ASCII, 0, 0,

0 commit comments

Comments
 (0)