Skip to content

Commit 7251093

Browse files
author
centmeng
committed
ActiveMQ
1 parent 3392c39 commit 7251093

11 files changed

+241
-0
lines changed

src/com/msj/activemq/Receiver.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.msj.activemq;
2+
3+
import org.apache.activemq.ActiveMQConnectionFactory;
4+
5+
import javax.jms.*;
6+
7+
public class Receiver {
8+
9+
public static void main(String[] args) {
10+
try {
11+
//第一步:建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认即可,默认端口为"tcp://localhost:61616"
12+
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
13+
ActiveMQConnectionFactory.DEFAULT_USER,
14+
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
15+
"failover:(tcp://192.168.1.111:51511,tcp://192.168.1.111:51512,tcp://192.168.1.111:51513)?Randomize=false");
16+
17+
//第二步:通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。
18+
Connection connection = connectionFactory.createConnection();
19+
connection.start();
20+
21+
//第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,一般我们设置自动签收。
22+
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
23+
24+
//第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个Queue和Topic。
25+
Destination destination = session.createQueue("first");
26+
//第五步:通过Session创建MessageConsumer
27+
MessageConsumer consumer = session.createConsumer(destination);
28+
29+
while(true){
30+
TextMessage msg = (TextMessage)consumer.receive();
31+
if(msg == null) break;
32+
System.out.println("收到的内容:" + msg.getText());
33+
}
34+
} catch (Exception e) {
35+
e.printStackTrace();
36+
}
37+
38+
39+
40+
41+
}
42+
}

src/com/msj/activemq/Sender.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.msj.activemq;
2+
3+
import org.apache.activemq.ActiveMQConnectionFactory;
4+
5+
import javax.jms.*;
6+
7+
public class Sender {
8+
9+
public static void main(String[] args) {
10+
try {
11+
//第一步:建立ConnectionFactory工厂对象,需要填入用户名、密码、以及要连接的地址,均使用默认即可,默认端口为"tcp://localhost:61616"
12+
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
13+
ActiveMQConnectionFactory.DEFAULT_USER,
14+
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
15+
"failover:(tcp://192.168.1.111:51511,tcp://192.168.1.111:51512,tcp://192.168.1.111:51513)?Randomize=false");
16+
17+
//第二步:通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法开启连接,Connection默认是关闭的。
18+
Connection connection = connectionFactory.createConnection();
19+
connection.start();
20+
21+
//第三步:通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,一般我们设置自动签收。
22+
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
23+
24+
//第四步:通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称作Queue即队列;在Pub/Sub模式,Destination被称作Topic即主题。在程序中可以使用多个Queue和Topic。
25+
Destination destination = session.createQueue("first");
26+
27+
//第五步:我们需要通过Session对象创建消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer。
28+
MessageProducer producer = session.createProducer(destination);
29+
30+
//第六步:我们可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode),我们稍后详细介绍。
31+
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
32+
33+
//第七步:最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection连接。
34+
35+
for(int i = 0 ; i < 500000 ; i ++){
36+
TextMessage msg = session.createTextMessage("我是消息内容" + i);
37+
// 第一个参数目标地址
38+
// 第二个参数 具体的数据信息
39+
// 第三个参数 传送数据的模式
40+
// 第四个参数 优先级
41+
// 第五个参数 消息的过期时间
42+
producer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 0 , 1000L);
43+
System.out.println("发送消息:" + msg.getText());
44+
Thread.sleep(1000);
45+
46+
}
47+
48+
if(connection != null){
49+
connection.close();
50+
}
51+
} catch (Exception e) {
52+
e.printStackTrace();
53+
}
54+
55+
}
56+
}
201 KB
Binary file not shown.
Binary file not shown.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one or more
3+
contributor license agreements. See the NOTICE file distributed with
4+
this work for additional information regarding copyright ownership.
5+
The ASF licenses this file to You under the Apache License, Version 2.0
6+
(the "License"); you may not use this file except in compliance with
7+
the License. You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
<!-- START SNIPPET: example -->
18+
<beans
19+
xmlns="http://www.springframework.org/schema/beans"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
22+
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
23+
24+
<!-- Allows us to use system properties as variables in this configuration file -->
25+
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
26+
<property name="locations">
27+
<value>file:${activemq.conf}/credentials.properties</value>
28+
</property>
29+
</bean>
30+
31+
<!-- Allows accessing the server log -->
32+
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
33+
lazy-init="false" scope="singleton"
34+
init-method="start" destroy-method="stop">
35+
</bean>
36+
37+
<!--
38+
The <broker> element is used to configure the ActiveMQ broker.
39+
-->
40+
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
41+
42+
<destinationPolicy>
43+
<policyMap>
44+
<policyEntries>
45+
<policyEntry topic=">" >
46+
<!-- The constantPendingMessageLimitStrategy is used to prevent
47+
slow topic consumers to block producers and affect other consumers
48+
by limiting the number of messages that are retained
49+
For more information, see:
50+
51+
http://activemq.apache.org/slow-consumer-handling.html
52+
53+
-->
54+
<pendingMessageLimitStrategy>
55+
<constantPendingMessageLimitStrategy limit="1000"/>
56+
</pendingMessageLimitStrategy>
57+
</policyEntry>
58+
</policyEntries>
59+
</policyMap>
60+
</destinationPolicy>
61+
62+
63+
<!--
64+
The managementContext is used to configure how ActiveMQ is exposed in
65+
JMX. By default, ActiveMQ uses the MBean server that is started by
66+
the JVM. For more information, see:
67+
68+
http://activemq.apache.org/jmx.html
69+
-->
70+
<managementContext>
71+
<managementContext createConnector="false"/>
72+
</managementContext>
73+
74+
<!--
75+
Configure message persistence for the broker. The default persistence
76+
mechanism is the KahaDB store (identified by the kahaDB tag).
77+
For more information, see:
78+
79+
http://activemq.apache.org/persistence.html
80+
-->
81+
<persistenceAdapter>
82+
<kahaDB directory="${activemq.data}/kahadb"/>
83+
</persistenceAdapter>
84+
85+
86+
<!--
87+
The systemUsage controls the maximum amount of space the broker will
88+
use before disabling caching and/or slowing down producers. For more information, see:
89+
http://activemq.apache.org/producer-flow-control.html
90+
-->
91+
<systemUsage>
92+
<systemUsage>
93+
<memoryUsage>
94+
<memoryUsage percentOfJvmHeap="70" />
95+
</memoryUsage>
96+
<storeUsage>
97+
<storeUsage limit="100 gb"/>
98+
</storeUsage>
99+
<tempUsage>
100+
<tempUsage limit="50 gb"/>
101+
</tempUsage>
102+
</systemUsage>
103+
</systemUsage>
104+
105+
<!--
106+
The transport connectors expose ActiveMQ over a given protocol to
107+
clients and other brokers. For more information, see:
108+
109+
http://activemq.apache.org/configuring-transports.html
110+
-->
111+
<transportConnectors>
112+
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
113+
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
114+
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
115+
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
116+
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
117+
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
118+
</transportConnectors>
119+
120+
<!-- destroy the spring context on shutdown to stop jetty -->
121+
<shutdownHooks>
122+
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
123+
</shutdownHooks>
124+
125+
<plugins>
126+
<simpleAuthenticationPlugin>
127+
<users>
128+
<authenticationUser username="bhz" password="bhz" groups="users,admins"/>
129+
</users>
130+
</simpleAuthenticationPlugin>
131+
</plugins>
132+
</broker>
133+
134+
<!--
135+
Enable web consoles, REST and Ajax APIs and demos
136+
The web consoles requires by default login, you can disable this in the jetty.xml file
137+
138+
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
139+
-->
140+
<import resource="jetty.xml"/>
141+
142+
</beans>
143+
<!-- END SNIPPET: example -->
460 KB
Binary file not shown.
841 KB
Loading
1.17 MB
Loading
1.51 MB
Loading
472 KB
Loading

0 commit comments

Comments
 (0)