Skip to content

Commit f58dbc3

Browse files
authored
[ISSUE apache#2300] Enhancement: Benchmark support acl and msg trace (apache#2301)
* [Benchmark]Support acl, add msgTraceEnable option for producer and consumer, fix Algorithm HmacSHA1 not available using openjdk * add apache header * add aclEnable option
1 parent 73f2071 commit f58dbc3

File tree

5 files changed

+74
-6
lines changed

5 files changed

+74
-6
lines changed

distribution/benchmark/runclass.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPe
5656
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
5757
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_run_class_gc_%p_%t.log -XX:+PrintGCDetails"
5858
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
59-
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
6059
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
6160
JAVA_OPT="${JAVA_OPT} -XX:+PerfDisableSharedMem"
61+
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
6262
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
6363
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
6464

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.example.benchmark;
19+
20+
import org.apache.rocketmq.acl.common.AclClientRPCHook;
21+
import org.apache.rocketmq.acl.common.SessionCredentials;
22+
import org.apache.rocketmq.remoting.RPCHook;
23+
24+
public class AclClient {
25+
26+
private static final String ACL_ACCESS_KEY = "rocketmq2";
27+
28+
private static final String ACL_SECRET_KEY = "12345678";
29+
30+
static RPCHook getAclRPCHook() {
31+
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
32+
}
33+
}

example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
3535
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
3636
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
37+
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
3738
import org.apache.rocketmq.client.exception.MQClientException;
3839
import org.apache.rocketmq.common.MixAll;
3940
import org.apache.rocketmq.common.filter.ExpressionType;
4041
import org.apache.rocketmq.common.message.MessageExt;
42+
import org.apache.rocketmq.remoting.RPCHook;
4143
import org.apache.rocketmq.srvutil.ServerUtil;
4244

4345
public class Consumer {
@@ -55,12 +57,16 @@ public static void main(String[] args) throws MQClientException, IOException {
5557
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
5658
final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
5759
final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
60+
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
61+
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
62+
5863
String group = groupPrefix;
5964
if (Boolean.parseBoolean(isSuffixEnable)) {
6065
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
6166
}
6267

63-
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s%n", topic, group, isSuffixEnable, filterType, expression);
68+
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
69+
topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
6470

6571
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
6672

@@ -111,7 +117,8 @@ public void run() {
111117
}
112118
}, 10000, 10000);
113119

114-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
120+
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
121+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
115122
if (commandLine.hasOption('n')) {
116123
String ns = commandLine.getOptionValue('n');
117124
consumer.setNamesrvAddr(ns);
@@ -192,6 +199,14 @@ public static Options buildCommandlineOptions(final Options options) {
192199
opt.setRequired(false);
193200
options.addOption(opt);
194201

202+
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
203+
opt.setRequired(false);
204+
options.addOption(opt);
205+
206+
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
207+
opt.setRequired(false);
208+
options.addOption(opt);
209+
195210
return options;
196211
}
197212

example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.rocketmq.client.producer.DefaultMQProducer;
3636
import org.apache.rocketmq.logging.InternalLogger;
3737
import org.apache.rocketmq.common.message.Message;
38+
import org.apache.rocketmq.remoting.RPCHook;
3839
import org.apache.rocketmq.remoting.common.RemotingHelper;
3940
import org.apache.rocketmq.remoting.exception.RemotingException;
4041
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -53,8 +54,11 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
5354
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
5455
final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
5556
final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
57+
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
58+
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
5659

57-
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
60+
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s traceEnable %s aclEnable %s%n",
61+
topic, threadCount, messageSize, keyEnable, msgTraceEnable, aclEnable);
5862

5963
final InternalLogger log = ClientLogger.getLog();
6064

@@ -100,7 +104,8 @@ public void run() {
100104
}
101105
}, 10000, 10000);
102106

103-
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
107+
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
108+
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
104109
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
105110

106111
if (commandLine.hasOption('n')) {
@@ -210,6 +215,14 @@ public static Options buildCommandlineOptions(final Options options) {
210215
opt.setRequired(false);
211216
options.addOption(opt);
212217

218+
opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
219+
opt.setRequired(false);
220+
options.addOption(opt);
221+
222+
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
223+
opt.setRequired(false);
224+
options.addOption(opt);
225+
213226
return options;
214227
}
215228

example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static void main(String[] args) throws MQClientException, UnsupportedEnco
6868
config.checkUnknownRate = commandLine.hasOption("cu") ? Double.parseDouble(commandLine.getOptionValue("cu")) : 0.0;
6969
config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
7070
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
71+
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
7172

7273
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
7374

@@ -122,7 +123,8 @@ public void run() {
122123
}, 10000, 10000);
123124

124125
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
125-
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
126+
final TransactionMQProducer producer =
127+
new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null);
126128
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
127129
producer.setTransactionListener(transactionCheckListener);
128130
producer.setDefaultTopicQueueNums(1000);
@@ -250,6 +252,10 @@ public static Options buildCommandlineOptions(final Options options) {
250252
opt.setRequired(false);
251253
options.addOption(opt);
252254

255+
opt = new Option("a", "aclEnable", true, "Acl Enable, Default: false");
256+
opt.setRequired(false);
257+
options.addOption(opt);
258+
253259
return options;
254260
}
255261
}
@@ -432,6 +438,7 @@ class TxSendConfig {
432438
double checkUnknownRate;
433439
long batchId;
434440
int sendInterval;
441+
boolean aclEnable;
435442
}
436443

437444
class LRUMap<K, V> extends LinkedHashMap<K, V> {

0 commit comments

Comments
 (0)