diff --git a/spring-amqp-parent/pom.xml b/spring-amqp-parent/pom.xml index e544d053c9..1f0ba8a940 100644 --- a/spring-amqp-parent/pom.xml +++ b/spring-amqp-parent/pom.xml @@ -20,7 +20,7 @@ 1.8.4 1.4.3 1.5.3 - 2.5.0 + 2.7.1 3.0.5.RELEASE diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java index a5a65cd983..007c31158d 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java @@ -21,6 +21,7 @@ /** * @author Dave Syer + * @author Gary Russell */ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser { @@ -32,6 +33,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser { private static final String PORT_ATTRIBUTE = "port"; + private static final String ADDRESSES = "addresses"; + private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host"; private static final String USER_ATTRIBUTE = "username"; @@ -63,6 +66,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit NamespaceUtils.setValueIfAttributeDefined(builder, element, USER_ATTRIBUTE); NamespaceUtils.setValueIfAttributeDefined(builder, element, PASSWORD_ATTRIBUTE); NamespaceUtils.setValueIfAttributeDefined(builder, element, VIRTUAL_HOST_ATTRIBUTE); + NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueArgumentsParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueArgumentsParser.java new file mode 100644 index 0000000000..e1eaaeed0c --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueArgumentsParser.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.amqp.rabbit.config; + +import java.util.Map; + +import org.springframework.beans.factory.config.MapFactoryBean; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.w3c.dom.Element; + +/** + * @author Gary Russell + * @since 1.0.1 + * + */ +class QueueArgumentsParser extends AbstractSingleBeanDefinitionParser { + + @Override + protected void doParse(Element element, ParserContext parserContext, + BeanDefinitionBuilder builder) { + Map map = parserContext.getDelegate().parseMapElement(element, + builder.getRawBeanDefinition()); + builder.addPropertyValue("sourceMap", map); + } + + @Override + protected String getBeanClassName(Element element) { + return MapFactoryBean.class.getName(); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java index 8cbeb0d871..e3ab0dba4f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/QueueParser.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; import org.w3c.dom.Element; @@ -29,7 +30,7 @@ */ public class QueueParser extends AbstractSingleBeanDefinitionParser { - private static final String ARGUMENTS_ELEMENT = "queue-arguments"; + private static final String ARGUMENTS = "queue-arguments"; // element OR attribute private static final String DURABLE_ATTRIBUTE = "durable"; private static final String EXCLUSIVE_ATTRIBUTE = "exclusive"; private static final String AUTO_DELETE_ATTRIBUTE = "auto-delete"; @@ -79,13 +80,25 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit } - Element argumentsElement = DomUtils.getChildElementByTagName(element, ARGUMENTS_ELEMENT); + String queueArguments = element.getAttribute(ARGUMENTS); + Element argumentsElement = DomUtils.getChildElementByTagName(element, ARGUMENTS); + if (argumentsElement != null) { + if (StringUtils.hasText(queueArguments)) { + parserContext + .getReaderContext() + .error("Queue may have either a queue-attributes attribute or element, but not both", + element); + } Map map = parserContext.getDelegate().parseMapElement(argumentsElement, builder.getRawBeanDefinition()); builder.addConstructorArgValue(map); } + if (StringUtils.hasText(queueArguments)) { + builder.addConstructorArgReference(queueArguments); + } + } private boolean attributeHasIllegalOverride(Element element, String name, String allowed) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceHandler.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceHandler.java index b1060aaf53..1a660aa064 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceHandler.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceHandler.java @@ -23,6 +23,7 @@ * * @author Mark Pollack * @author Mark Fisher + * @author Gary Russell * @since 1.0 */ public class RabbitNamespaceHandler extends NamespaceHandlerSupport { @@ -37,6 +38,7 @@ public void init() { registerBeanDefinitionParser("admin", new AdminParser()); registerBeanDefinitionParser("connection-factory", new ConnectionFactoryParser()); registerBeanDefinitionParser("template", new TemplateParser()); + registerBeanDefinitionParser("queue-arguments", new QueueArgumentsParser()); } } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java index 5baaee6eca..e3f2a84ab7 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/TemplateParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2011 the original author or authors. + * Copyright 2010-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -22,6 +22,7 @@ /** * @author Dave Syer + * @author Gary Russell */ class TemplateParser extends AbstractSingleBeanDefinitionParser { @@ -41,6 +42,8 @@ class TemplateParser extends AbstractSingleBeanDefinitionParser { private static final String CHANNEL_TRANSACTED_ATTRIBUTE = "channel-transacted"; + private static final String REPLY_QUEUE = "reply-queue"; + @Override protected Class getBeanClass(Element element) { return RabbitTemplate.class; @@ -77,6 +80,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit NamespaceUtils.setValueIfAttributeDefined(builder, element, REPLY_TIMEOUT_ATTRIBUTE); NamespaceUtils.setValueIfAttributeDefined(builder, element, ENCODING_ATTRIBUTE); NamespaceUtils.setReferenceIfAttributeDefined(builder, element, MESSAGE_CONVERTER_ATTRIBUTE); + NamespaceUtils.setReferenceIfAttributeDefined(builder, element, REPLY_QUEUE); } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java index 98a24205eb..6a9b880fe8 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java @@ -22,8 +22,11 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.util.Assert; +import com.rabbitmq.client.Address; + /** * @author Dave Syer + * @author Gary Russell * */ public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean { @@ -36,6 +39,8 @@ public abstract class AbstractConnectionFactory implements ConnectionFactory, Di private final CompositeChannelListener channelListener = new CompositeChannelListener(); + private volatile Address[] addresses; + /** * Create a new SingleConnectionFactory for the given target ConnectionFactory. * @param rabbitConnectionFactory the target ConnectionFactory @@ -77,6 +82,17 @@ public int getPort() { return this.rabbitConnectionFactory.getPort(); } + /** + * Set addresses for clustering. + * @param addresses list of addresses with form "host[:port],..." + */ + public void setAddresses(String addresses) { + Address[] addressArray = Address.parseAddresses(addresses); + if (addressArray.length > 0) { + this.addresses = addressArray; + } + } + /** * A composite connection listener to be used by subclasses when creating and closing connections. * @@ -113,6 +129,9 @@ public void addChannelListener(ChannelListener listener) { final protected Connection createBareConnection() { try { + if (this.addresses != null) { + return new SimpleConnection(this.rabbitConnectionFactory.newConnection(this.addresses)); + } return new SimpleConnection(this.rabbitConnectionFactory.newConnection()); } catch (IOException e) { throw RabbitUtils.convertRabbitAccessException(e); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java index ed5ac44244..dd54c0c021 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2011 the original author or authors. + * Copyright 2002-2012 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,15 +14,20 @@ package org.springframework.amqp.rabbit.core; import java.io.IOException; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.springframework.amqp.AmqpException; import org.springframework.amqp.AmqpIllegalStateException; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils; import org.springframework.amqp.rabbit.connection.RabbitAccessor; @@ -73,9 +78,10 @@ * @author Mark Pollack * @author Mark Fisher * @author Dave Syer + * @author Gary Russell * @since 1.0 */ -public class RabbitTemplate extends RabbitAccessor implements RabbitOperations { +public class RabbitTemplate extends RabbitAccessor implements RabbitOperations, MessageListener { private static final String DEFAULT_EXCHANGE = ""; // alias for amq.direct default exchange @@ -98,7 +104,11 @@ public class RabbitTemplate extends RabbitAccessor implements RabbitOperations { private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter(); - private String encoding = DEFAULT_ENCODING; + private volatile String encoding = DEFAULT_ENCODING; + + private volatile Queue replyQueue; + + private final Map> replyHolder = new ConcurrentHashMap>(); /** * Convenient constructor for use with setter injection. Don't forget to set the connection factory. @@ -164,6 +174,17 @@ public void setEncoding(String encoding) { this.encoding = encoding; } + + /** + * A queue for replies; if not provided, a temporary exclusive, auto-delete queue will + * be used for each reply. + * + * @param replyQueue the replyQueue to set + */ + public void setReplyQueue(Queue replyQueue) { + this.replyQueue = replyQueue; + } + /** * Specify the timeout in milliseconds to be used when waiting for a reply Message when using one of the * sendAndReceive methods. The default value is defined as {@link #DEFAULT_REPLY_TIMEOUT}. A negative value @@ -358,6 +379,15 @@ public Object convertSendAndReceive(final String exchange, final String routingK * @return the message that is received in reply */ protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) { + if (this.replyQueue == null) { + return doSendAndReceiveWithTemporary(exchange, routingKey, message); + } + else { + return doSendAndReceiveWithFixed(exchange, routingKey, message); + } + } + + protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) { Message replyMessage = this.execute(new ChannelCallback() { public Message doInRabbit(Channel channel) throws Exception { final SynchronousQueue replyHandoff = new SynchronousQueue(); @@ -397,6 +427,31 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp return replyMessage; } + protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message) { + Message replyMessage = this.execute(new ChannelCallback() { + public Message doInRabbit(Channel channel) throws Exception { + final LinkedBlockingQueue replyHandoff = new LinkedBlockingQueue(); + String messageTag = UUID.randomUUID().toString(); + RabbitTemplate.this.replyHolder.put(messageTag, replyHandoff); + + Assert.isNull(message.getMessageProperties().getReplyTo(), + "Send-and-receive methods can only be used if the Message does not already have a replyTo property."); + message.getMessageProperties().setReplyTo(RabbitTemplate.this.replyQueue.getName()); + message.getMessageProperties().setHeader("spring_reply_correlation", messageTag); + + if (logger.isDebugEnabled()) { + logger.debug("Sending message with tag " + messageTag); + } + doSend(channel, exchange, routingKey, message); + Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout, + TimeUnit.MILLISECONDS); + RabbitTemplate.this.replyHolder.remove(messageTag); + return reply; + } + }); + return replyMessage; + } + public T execute(ChannelCallback action) { Assert.notNull(action, "Callback object must not be null"); RabbitResourceHolder resourceHolder = getTransactionalResourceHolder(); @@ -480,4 +535,23 @@ private String getRequiredQueue() throws IllegalStateException { return name; } + public void onMessage(Message message) { + String messageTag = (String) message.getMessageProperties().getHeaders().get("spring_reply_correlation"); + if (messageTag == null) { + logger.error("No correlation header in reply"); + return; + } + LinkedBlockingQueue queue = this.replyHolder.get(messageTag); + if (queue == null) { + if (logger.isWarnEnabled()) { + logger.warn("Reply received after timeout for " + messageTag); + } + return; + } + queue.add(message); + if (logger.isDebugEnabled()) { + logger.debug("Reply received for " + messageTag); + } + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java index a8713d587d..5247789899 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java @@ -70,6 +70,8 @@ public class BlockingQueueConsumer { private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final AtomicBoolean cancelReceived = new AtomicBoolean(false); + private final AcknowledgeMode acknowledgeMode; private final ConnectionFactory connectionFactory; @@ -166,7 +168,12 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi logger.debug("Retrieving delivery for " + this); } checkShutdown(); - return handle(queue.poll(timeout, TimeUnit.MILLISECONDS)); + Message message = handle(queue.poll(timeout, TimeUnit.MILLISECONDS)); + if (message == null && cancelReceived.get()) { + cancelReceived.set(false); + throw new ConsumerCancelledException(); + } + return message; } public void start() throws AmqpException { @@ -178,20 +185,36 @@ public void start() throws AmqpException { this.consumer = new InternalConsumer(channel); this.deliveryTags.clear(); this.activeObjectCounter.add(this); - try { - if (!acknowledgeMode.isAutoAck()) { - // Set basicQos before calling basicConsume (otherwise if we are not acking the broker - // will send blocks of 100 messages) - channel.basicQos(prefetchCount); - } - for (int i = 0; i < queues.length; i++) { - channel.queueDeclarePassive(queues[i]); + int passiveDeclareTries = 3; // mirrored queue might be being moved + do { + try { + if (!acknowledgeMode.isAutoAck()) { + // Set basicQos before calling basicConsume (otherwise if we are not acking the broker + // will send blocks of 100 messages) + channel.basicQos(prefetchCount); + } + for (int i = 0; i < queues.length; i++) { + channel.queueDeclarePassive(queues[i]); + } + passiveDeclareTries = 0; + } catch (IOException e) { + if (passiveDeclareTries > 0) { + if (logger.isWarnEnabled()) { + logger.warn("Reconnect failed; retries left=" + (passiveDeclareTries-1), e); + try { + Thread.sleep(5000); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } + } + } else { + this.activeObjectCounter.release(this); + throw new FatalListenerStartupException("Cannot prepare queue for listener. " + + "Either the queue doesn't exist or the broker will not allow us to use it.", e); + } } - } catch (IOException e) { - this.activeObjectCounter.release(this); - throw new FatalListenerStartupException("Cannot prepare queue for listener. " - + "Either the queue doesn't exist or the broker will not allow us to use it.", e); - } + } while (passiveDeclareTries-- > 0); + try { for (int i = 0; i < queues.length; i++) { channel.basicConsume(queues[i], acknowledgeMode.isAutoAck(), consumer); @@ -207,12 +230,21 @@ public void start() throws AmqpException { public void stop() { cancelled.set(true); if (consumer != null && consumer.getChannel() != null && consumer.getConsumerTag() != null) { - RabbitUtils.closeMessageConsumer(consumer.getChannel(), consumer.getConsumerTag(), transactional); + try { + RabbitUtils.closeMessageConsumer(consumer.getChannel(), consumer.getConsumerTag(), transactional); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Error closing consumer", e); + } + } + } + if (logger.isDebugEnabled()) { + logger.debug("Closing Rabbit Channel: " + channel); } - logger.debug("Closing Rabbit Channel: " + channel); // This one never throws exceptions... RabbitUtils.closeChannel(channel); deliveryTags.clear(); + consumer = null; } private class InternalConsumer extends DefaultConsumer { @@ -231,6 +263,14 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig deliveryTags.clear(); } + @Override + public void handleCancel(String consumerTag) throws IOException { + if (logger.isWarnEnabled()) { + logger.warn("Cancel received"); + } + cancelReceived.set(true); + } + @Override public void handleCancelOk(String consumerTag) { if (logger.isDebugEnabled()) { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConsumerCancelledException.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConsumerCancelledException.java new file mode 100644 index 0000000000..4936c43f2c --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/ConsumerCancelledException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.amqp.rabbit.listener; + +/** + * Thrown when the broker cancels the consumer and the message + * queue is drained. + * + * @author Gary Russell + * @since 1.0.1 + * + */ +public class ConsumerCancelledException extends RuntimeException { + + private static final long serialVersionUID = 1L; + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index 4cbc1e8232..61a93796f9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -29,7 +29,7 @@ import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.impl.LongString; +import com.rabbitmq.client.LongString; /** * Default implementation of the {@link MessagePropertiesConverter} strategy. diff --git a/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.0.xsd b/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.0.xsd index 6255d7e18f..09500637cd 100644 --- a/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.0.xsd +++ b/spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.0.xsd @@ -56,6 +56,18 @@ ]]> + + + element. + ]]> + + + + + + + @@ -338,6 +350,7 @@ + + + + for replies; optional; if not supplied, methods expecting replies + will use a temporary, exclusive, auto-delete queue. + ]]> + + + + + + + @@ -724,6 +750,13 @@ ]]> + + + + + args = (Map) ctx.getBean("args"); + assertEquals("bar", args.get("foo")); + + assertEquals("qux", queue1.getArguments().get("baz")); + assertEquals("bar", queue2.getArguments().get("foo")); + } + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/QueueParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/QueueParserTests.java index 4116b3e451..a54433f258 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/QueueParserTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/QueueParserTests.java @@ -100,6 +100,13 @@ public void testAnonymousArgumentsQueue() throws Exception { assertEquals("spam", queue.getArguments().get("foo")); } + @Test + public void testReferencedArgumentsQueue() throws Exception { + Queue queue = beanFactory.getBean("referencedArguments", Queue.class); + assertNotNull(queue); + assertEquals("qux", queue.getArguments().get("baz")); + } + @Test(expected=BeanDefinitionStoreException.class) public void testIllegalAnonymousQueue() throws Exception { beanFactory = new XmlBeanFactory(new ClassPathResource(getClass().getSimpleName() diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/TemplateParserTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/TemplateParserTests.java index d8de26a74b..06d93db4ba 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/TemplateParserTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/TemplateParserTests.java @@ -13,20 +13,25 @@ package org.springframework.amqp.rabbit.config; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Map; + import org.junit.Before; import org.junit.Test; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SerializerMessageConverter; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.xml.XmlBeanFactory; import org.springframework.core.io.ClassPathResource; /** * * @author Dave Syer + * @author Gary Russell * */ public final class TemplateParserTests { @@ -50,5 +55,24 @@ public void testKitchenSink() throws Exception { assertNotNull(template); assertTrue(template.getMessageConverter() instanceof SerializerMessageConverter); } - + + @Test + public void testWithArgs() throws Exception { + RabbitTemplate template = beanFactory.getBean("withArgs", RabbitTemplate.class); + assertNotNull(template); + DirectFieldAccessor dfa = new DirectFieldAccessor(template); + Map args = (Map) dfa.getPropertyValue("replyQueueArguments"); + assertNotNull(args); + assertEquals("bar", args.get("foo")); + } + + @Test + public void testWithAnonArgs() throws Exception { + RabbitTemplate template = beanFactory.getBean("withAnonArgs", RabbitTemplate.class); + assertNotNull(template); + DirectFieldAccessor dfa = new DirectFieldAccessor(template); + Map args = (Map) dfa.getPropertyValue("replyQueueArguments"); + assertNotNull(args); + assertEquals("qux", args.get("baz")); + } } diff --git a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueArgumentsParserTests-context.xml b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueArgumentsParserTests-context.xml new file mode 100644 index 0000000000..195f63056d --- /dev/null +++ b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueArgumentsParserTests-context.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + diff --git a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserPlaceholderTests-context.xml b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserPlaceholderTests-context.xml index 481980cf39..f42edfbe2a 100644 --- a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserPlaceholderTests-context.xml +++ b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserPlaceholderTests-context.xml @@ -38,4 +38,10 @@ + + + + + + diff --git a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserTests-context.xml b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserTests-context.xml index 3a0565ee64..4e04e89aff 100644 --- a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserTests-context.xml +++ b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/QueueParserTests-context.xml @@ -28,4 +28,10 @@ + + + + + + diff --git a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/TemplateParserTests-context.xml b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/TemplateParserTests-context.xml index 275fa8815c..75d7d366ac 100644 --- a/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/TemplateParserTests-context.xml +++ b/spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/TemplateParserTests-context.xml @@ -1,7 +1,7 @@ @@ -10,7 +10,19 @@ encoding="UTF-8" exchange="foo" queue="bar" routing-key="spam" message-converter="converter" reply-timeout="1000" /> - + + + + + + + + + + + + +