Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
AMQP-206 Support Rabbit HA
- Configure a list of addresses in ConnectionFactory
- Allow arguments (e.g. HA) for temporary reply queue creation
  • Loading branch information
garyrussell committed Jan 11, 2012
commit 0bdd3f3cdd6c103c995c62baa212b8a5a78625ad
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
* @author Dave Syer
* @author Gary Russell
*/
class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {

Expand All @@ -32,6 +33,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {

private static final String PORT_ATTRIBUTE = "port";

private static final String ADDRESSES = "addresses";

private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host";

private static final String USER_ATTRIBUTE = "username";
Expand Down Expand Up @@ -63,6 +66,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
NamespaceUtils.setValueIfAttributeDefined(builder, element, USER_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, PASSWORD_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, VIRTUAL_HOST_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

/**
* @author Dave Syer
* @author Gary Russell
*/
class TemplateParser extends AbstractSingleBeanDefinitionParser {

Expand All @@ -41,6 +42,8 @@ class TemplateParser extends AbstractSingleBeanDefinitionParser {

private static final String CHANNEL_TRANSACTED_ATTRIBUTE = "channel-transacted";

private static final String REPLY_QUEUE_ARGUMENTS = "reply-queue-arguments";

@Override
protected Class<?> getBeanClass(Element element) {
return RabbitTemplate.class;
Expand Down Expand Up @@ -77,6 +80,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
NamespaceUtils.setValueIfAttributeDefined(builder, element, REPLY_TIMEOUT_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, ENCODING_ATTRIBUTE);
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, MESSAGE_CONVERTER_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, REPLY_QUEUE_ARGUMENTS);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.Assert;

import com.rabbitmq.client.Address;

/**
* @author Dave Syer
* @author Gary Russell
*
*/
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean {
Expand All @@ -36,6 +39,8 @@ public abstract class AbstractConnectionFactory implements ConnectionFactory, Di

private final CompositeChannelListener channelListener = new CompositeChannelListener();

private volatile Address[] addresses;

/**
* Create a new SingleConnectionFactory for the given target ConnectionFactory.
* @param rabbitConnectionFactory the target ConnectionFactory
Expand Down Expand Up @@ -77,6 +82,17 @@ public int getPort() {
return this.rabbitConnectionFactory.getPort();
}

/**
* Set addresses for clustering.
* @param addresses list of addresses with form "host[:port],..."
*/
public void setAddresses(String addresses) {
Address[] addressArray = Address.parseAddresses(addresses);
if (addressArray.length > 0) {
this.addresses = addressArray;
}
}

/**
* A composite connection listener to be used by subclasses when creating and closing connections.
*
Expand Down Expand Up @@ -113,6 +129,9 @@ public void addChannelListener(ChannelListener listener) {

final protected Connection createBareConnection() {
try {
if (this.addresses != null) {
return new SimpleConnection(this.rabbitConnectionFactory.newConnection(this.addresses));
}
return new SimpleConnection(this.rabbitConnectionFactory.newConnection());
} catch (IOException e) {
throw RabbitUtils.convertRabbitAccessException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package org.springframework.amqp.rabbit.core;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -73,6 +75,7 @@
* @author Mark Pollack
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor implements RabbitOperations {
Expand All @@ -98,7 +101,9 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations {

private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

private String encoding = DEFAULT_ENCODING;
private volatile String encoding = DEFAULT_ENCODING;

private volatile Map<String, Object> replyQueueArguments;

/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
Expand Down Expand Up @@ -205,6 +210,23 @@ public void setMessagePropertiesConverter(MessagePropertiesConverter messageProp
this.messagePropertiesConverter = messagePropertiesConverter;
}

/**
* @param replyQueueArguments the replyQueueArguments to set
*/
public void setReplyQueueArguments(Map<String, Object> replyQueueArguments) {
this.replyQueueArguments = replyQueueArguments;
}

public void setReplyQueueArguments(String arguments) {
Map<String, Object> map = new HashMap<String, Object>();
String[] entries = arguments.split(",");
for (String entry : entries) {
String[] keyVal = entry.split("=");
map.put(keyVal[0].trim(), keyVal[1].trim());
}
this.replyQueueArguments = map;
}

/**
* Return the message converter for this template. Useful for clients that want to take advantage of the converter
* in {@link ChannelCallback} implementations.
Expand Down Expand Up @@ -364,7 +386,7 @@ public Message doInRabbit(Channel channel) throws Exception {

Assert.isNull(message.getMessageProperties().getReplyTo(),
"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
DeclareOk queueDeclaration = channel.queueDeclare();
DeclareOk queueDeclaration = channel.queueDeclare("", false, true, true, replyQueueArguments);
String replyTo = queueDeclaration.getQueue();
message.getMessageProperties().setReplyTo(replyTo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,13 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reply-queue-arguments" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
List of arguments used when creating a reply queue in sendAndReceive(). Default none.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>

Expand Down Expand Up @@ -724,6 +731,13 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="addresses" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
List of addresses; e.g. host1,host2:4567,host3 - overrides host/port if supplied.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="username" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Expand Down