Skip to content

Commit 3be7033

Browse files
maixiaohaizhangxu16
andauthored
[ISSUE apache#2146] Add benchmark shutdown script, add more print info, add consumer threand count command option (apache#2150)
Co-authored-by: zhangxu16 <[email protected]>
1 parent fd7da35 commit 3be7033

File tree

4 files changed

+78
-8
lines changed

4 files changed

+78
-8
lines changed

distribution/benchmark/shutdown.sh

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/bin/sh
2+
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
case $1 in
19+
producer)
20+
21+
pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Producer' |grep java | grep -v grep | awk '{print $1}'`
22+
if [ -z "$pid" ] ; then
23+
echo "No benchmark producer running."
24+
exit -1;
25+
fi
26+
27+
echo "The benchmkar producer(${pid}) is running..."
28+
29+
kill ${pid}
30+
31+
echo "Send shutdown request to benchmark producer(${pid}) OK"
32+
;;
33+
consumer)
34+
35+
pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.Consumer' |grep java | grep -v grep | awk '{print $1}'`
36+
if [ -z "$pid" ] ; then
37+
echo "No benchmark consumer running."
38+
exit -1;
39+
fi
40+
41+
echo "The benchmark consumer(${pid}) is running..."
42+
43+
kill ${pid}
44+
45+
echo "Send shutdown request to benchmark consumer(${pid}) OK"
46+
;;
47+
tproducer)
48+
49+
pid=`ps ax | grep -i 'org.apache.rocketmq.example.benchmark.TransactionProducer' |grep java | grep -v grep | awk '{print $1}'`
50+
if [ -z "$pid" ] ; then
51+
echo "No benchmark transaction producer running."
52+
exit -1;
53+
fi
54+
55+
echo "The benchmkar transaction producer(${pid}) is running..."
56+
57+
kill ${pid}
58+
59+
echo "Send shutdown request to benchmark transaction producer(${pid}) OK"
60+
;;
61+
*)
62+
echo "Useage: shutdown producer | consumer | tproducer"
63+
esac

example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static void main(String[] args) throws MQClientException, IOException {
5252
}
5353

5454
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
55+
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20;
5556
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
5657
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
5758
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
@@ -65,8 +66,8 @@ public static void main(String[] args) throws MQClientException, IOException {
6566
group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
6667
}
6768

68-
System.out.printf("topic: %s, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
69-
topic, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
69+
System.out.printf("topic: %s, threadCount %d, group: %s, suffix: %s, filterType: %s, expression: %s, msgTraceEnable: %s, aclEnable: %s%n",
70+
topic, threadCount, group, isSuffixEnable, filterType, expression, msgTraceEnable, aclEnable);
7071

7172
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
7273

@@ -101,8 +102,8 @@ private void printStats() {
101102
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
102103
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
103104

104-
System.out.printf("TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n",
105-
consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
105+
System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
106+
System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax
106107
);
107108
}
108109
}
@@ -123,6 +124,8 @@ public void run() {
123124
String ns = commandLine.getOptionValue('n');
124125
consumer.setNamesrvAddr(ns);
125126
}
127+
consumer.setConsumeThreadMin(threadCount);
128+
consumer.setConsumeThreadMax(threadCount);
126129
consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
127130

128131
if (filterType == null || expression == null) {
@@ -179,6 +182,10 @@ public static Options buildCommandlineOptions(final Options options) {
179182
opt.setRequired(false);
180183
options.addOption(opt);
181184

185+
opt = new Option("w", "threadCount", true, "Thread count, Default: 20");
186+
opt.setRequired(false);
187+
options.addOption(opt);
188+
182189
opt = new Option("g", "group", true, "Consumer group name, Default: benchmark_consumer");
183190
opt.setRequired(false);
184191
options.addOption(opt);

example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ private void printStats() {
8989
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
9090
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
9191

92-
System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n",
93-
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
92+
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
93+
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
9494
}
9595
}
9696

example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ private void printStats() {
105105
final long dupCheck = end.duplicatedCheck - begin.duplicatedCheck;
106106

107107
System.out.printf(
108-
"Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
109-
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
108+
"Current Time: %s Send TPS:%5d Max RT(ms):%5d AVG RT(ms):%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n",
109+
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, failCount, checkCount,
110110
unexpectedCheck, dupCheck);
111111
statsBenchmark.getSendMessageMaxRT().set(0);
112112
}

0 commit comments

Comments
 (0)