@@ -426,18 +426,17 @@ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
426426 AppendFuture <AppendEntryResponse > dledgerFuture ;
427427 EncodeResult encodeResult ;
428428
429- encodeResult = this .messageSerializer .serialize (msg );
430- if (encodeResult .status != AppendMessageStatus .PUT_OK ) {
431- return new PutMessageResult (PutMessageStatus .MESSAGE_ILLEGAL , new AppendMessageResult (encodeResult .status ));
432- }
433-
434429 putMessageLock .lock (); //spin or ReentrantLock ,depending on store config
435430 long elapsedTimeInLock ;
436431 long queueOffset ;
437432 try {
438433 beginTimeInDledgerLock = this .defaultMessageStore .getSystemClock ().now ();
434+ encodeResult = this .messageSerializer .serialize (msg );
439435 queueOffset = getQueueOffsetByKey (encodeResult .queueOffsetKey , tranType );
440- encodeResult .setQueueOffsetKey (queueOffset , false );
436+ encodeResult .setQueueOffsetKey (queueOffset );
437+ if (encodeResult .status != AppendMessageStatus .PUT_OK ) {
438+ return new PutMessageResult (PutMessageStatus .MESSAGE_ILLEGAL , new AppendMessageResult (encodeResult .status ));
439+ }
441440 AppendEntryRequest request = new AppendEntryRequest ();
442441 request .setGroup (dLedgerConfig .getGroup ());
443442 request .setRemoteId (dLedgerServer .getMemberState ().getSelfId ());
@@ -543,21 +542,19 @@ public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) {
543542 BatchAppendFuture <AppendEntryResponse > dledgerFuture ;
544543 EncodeResult encodeResult ;
545544
546- encodeResult = this .messageSerializer .serialize (messageExtBatch );
547- if (encodeResult .status != AppendMessageStatus .PUT_OK ) {
548- return new PutMessageResult (PutMessageStatus .MESSAGE_ILLEGAL , new AppendMessageResult (encodeResult
549- .status ));
550- }
551-
552545 putMessageLock .lock (); //spin or ReentrantLock ,depending on store config
553546 msgIdBuilder .setLength (0 );
554547 long elapsedTimeInLock ;
555548 long queueOffset ;
556549 long msgNum = 0 ;
557550 try {
558551 beginTimeInDledgerLock = this .defaultMessageStore .getSystemClock ().now ();
559- queueOffset = getQueueOffsetByKey (encodeResult .queueOffsetKey , tranType );
560- encodeResult .setQueueOffsetKey (queueOffset , true );
552+ encodeResult = this .messageSerializer .serialize (messageExtBatch );
553+ queueOffset = topicQueueTable .get (encodeResult .queueOffsetKey );
554+ if (encodeResult .status != AppendMessageStatus .PUT_OK ) {
555+ return new PutMessageResult (PutMessageStatus .MESSAGE_ILLEGAL , new AppendMessageResult (encodeResult
556+ .status ));
557+ }
561558 BatchAppendEntryRequest request = new BatchAppendEntryRequest ();
562559 request .setGroup (dLedgerConfig .getGroup ());
563560 request .setRemoteId (dLedgerServer .getMemberState ().getSelfId ());
@@ -667,7 +664,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner
667664 try {
668665 beginTimeInDledgerLock = this .defaultMessageStore .getSystemClock ().now ();
669666 queueOffset = getQueueOffsetByKey (encodeResult .queueOffsetKey , tranType );
670- encodeResult .setQueueOffsetKey (queueOffset , false );
667+ encodeResult .setQueueOffsetKey (queueOffset );
671668 AppendEntryRequest request = new AppendEntryRequest ();
672669 request .setGroup (dLedgerConfig .getGroup ());
673670 request .setRemoteId (dLedgerServer .getMemberState ().getSelfId ());
@@ -782,8 +779,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch mess
782779 long msgNum = 0 ;
783780 try {
784781 beginTimeInDledgerLock = this .defaultMessageStore .getSystemClock ().now ();
785- queueOffset = getQueueOffsetByKey (encodeResult .queueOffsetKey , tranType );
786- encodeResult .setQueueOffsetKey (queueOffset , true );
782+ queueOffset = topicQueueTable .get (encodeResult .queueOffsetKey );
787783 BatchAppendEntryRequest request = new BatchAppendEntryRequest ();
788784 request .setGroup (dLedgerConfig .getGroup ());
789785 request .setRemoteId (dLedgerServer .getMemberState ().getSelfId ());
@@ -961,15 +957,8 @@ public EncodeResult(AppendMessageStatus status, ByteBuffer data, String queueOff
961957 this .queueOffsetKey = queueOffsetKey ;
962958 }
963959
964- public void setQueueOffsetKey (long offset , boolean isBatch ) {
965- if (!isBatch ) {
966- this .data .putLong (MessageDecoder .QUEUE_OFFSET_POSITION , offset );
967- return ;
968- }
969-
970- for (byte [] data : batchData ) {
971- ByteBuffer .wrap (data ).putLong (MessageDecoder .QUEUE_OFFSET_POSITION , offset ++);
972- }
960+ public void setQueueOffsetKey (long offset ) {
961+ data .putLong (MessageDecoder .QUEUE_OFFSET_POSITION , offset );
973962 }
974963
975964 public byte [] getData () {
@@ -988,6 +977,8 @@ class MessageSerializer {
988977
989978 // The maximum length of the message
990979 private final int maxMessageSize ;
980+ // Build Message Key
981+ private final StringBuilder keyBuilder = new StringBuilder ();
991982
992983 MessageSerializer (final int size ) {
993984 this .maxMessageSize = size ;
@@ -1088,7 +1079,17 @@ public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
10881079 }
10891080
10901081 public EncodeResult serialize (final MessageExtBatch messageExtBatch ) {
1091- String key = messageExtBatch .getTopic () + "-" + messageExtBatch .getQueueId ();
1082+ keyBuilder .setLength (0 );
1083+ keyBuilder .append (messageExtBatch .getTopic ());
1084+ keyBuilder .append ('-' );
1085+ keyBuilder .append (messageExtBatch .getQueueId ());
1086+ String key = keyBuilder .toString ();
1087+
1088+ Long queueOffset = DLedgerCommitLog .this .topicQueueTable .get (key );
1089+ if (null == queueOffset ) {
1090+ queueOffset = 0L ;
1091+ DLedgerCommitLog .this .topicQueueTable .put (key , queueOffset );
1092+ }
10921093
10931094 int totalMsgLen = 0 ;
10941095 ByteBuffer messagesByteBuff = messageExtBatch .wrap ();
@@ -1153,7 +1154,7 @@ public EncodeResult serialize(final MessageExtBatch messageExtBatch) {
11531154 // 5 FLAG
11541155 msgStoreItemMemory .putInt (flag );
11551156 // 6 QUEUEOFFSET
1156- msgStoreItemMemory .putLong (0L );
1157+ msgStoreItemMemory .putLong (queueOffset ++ );
11571158 // 7 PHYSICALOFFSET
11581159 msgStoreItemMemory .putLong (0 );
11591160 // 8 SYSFLAG
@@ -1209,7 +1210,6 @@ public DLedgerSelectMappedBufferResult(SelectMmapBufferResult sbr) {
12091210 this .sbr = sbr ;
12101211 }
12111212
1212- @ Override
12131213 public synchronized void release () {
12141214 super .release ();
12151215 if (sbr != null ) {
0 commit comments