Skip to content

Commit 3f48c17

Browse files
AVP42wufc
andauthored
[ISSUE# 2330] Store the properties of MessageBatch (apache#2343)
* [ISSUE# 2330]store the properties of MessageBatch * [ISSUE# 2330]fix style problem * [ISSUE# 2330]Undo newly added property in the MessageBatch Co-authored-by: wufc <[email protected]>
1 parent d5cb67f commit 3f48c17

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,6 +1795,11 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
17951795
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
17961796
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
17971797

1798+
// properties from MessageExtBatch
1799+
String batchPropStr = MessageDecoder.messageProperties2String(messageExtBatch.getProperties());
1800+
final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8);
1801+
final short batchPropLen = (short) batchPropData.length;
1802+
17981803
while (messagesByteBuff.hasRemaining()) {
17991804
// 1 TOTALSIZE
18001805
messagesByteBuff.getInt();
@@ -1818,7 +1823,8 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
18181823

18191824
final int topicLength = topicData.length;
18201825

1821-
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength, propertiesLen);
1826+
final int msgLen = calMsgLength(messageExtBatch.getSysFlag(), bodyLen, topicLength,
1827+
propertiesLen + batchPropLen);
18221828

18231829
// Exceeds the maximum message
18241830
if (msgLen > this.maxMessageSize) {
@@ -1871,9 +1877,13 @@ public ByteBuffer encode(final MessageExtBatch messageExtBatch) {
18711877
this.msgBatchMemory.put((byte) topicLength);
18721878
this.msgBatchMemory.put(topicData);
18731879
// 17 PROPERTIES
1874-
this.msgBatchMemory.putShort(propertiesLen);
1875-
if (propertiesLen > 0)
1880+
this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen));
1881+
if (propertiesLen > 0) {
18761882
this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen);
1883+
}
1884+
if (batchPropLen > 0) {
1885+
this.msgBatchMemory.put(batchPropData, 0, batchPropLen);
1886+
}
18771887
}
18781888
msgBatchMemory.flip();
18791889
return msgBatchMemory;

store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.net.InetSocketAddress;
3636
import java.nio.charset.Charset;
3737
import java.util.ArrayList;
38+
import java.util.HashMap;
3839
import java.util.List;
3940
import java.util.Map;
4041

@@ -80,6 +81,12 @@ private MessageStore buildMessageStore() throws Exception {
8081

8182
@Test
8283
public void testPutMessages() throws Exception {
84+
String batchPropK = "extraKey";
85+
String batchPropV = "extraValue";
86+
Map<String, String> batchProp = new HashMap<>(1);
87+
batchProp.put(batchPropK, batchPropV);
88+
short batchPropLen = (short) messageProperties2String(batchProp).getBytes(MessageDecoder.CHARSET_UTF8).length;
89+
8390
List<Message> messages = new ArrayList<>();
8491
String topic = "batch-write-topic";
8592
int queue = 0;
@@ -98,14 +105,15 @@ public void testPutMessages() throws Exception {
98105
short propertiesLength = (short) propertiesBytes.length;
99106
final byte[] topicData = msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
100107
final int topicLength = topicData.length;
101-
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength) + msgLengthArr[j - 1];
108+
msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, propertiesLength+batchPropLen) + msgLengthArr[j - 1];
102109
j++;
103110
}
104111
byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
105112
MessageExtBatch messageExtBatch = new MessageExtBatch();
106113
messageExtBatch.setTopic(topic);
107114
messageExtBatch.setQueueId(queue);
108115
messageExtBatch.setBody(batchMessageBody);
116+
messageExtBatch.putUserProperty(batchPropK,batchPropV);
109117
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
110118
messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 125));
111119
messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 126));

0 commit comments

Comments
 (0)