Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
AMQP-206 ReConnect To Mirrored Queue
If a consumer is connected to a mirror (slave), and the
master dies, the queue is moved. The consumer is given a
cancel notification, indicating we need to reconnect.

However, the move might not be complete, so we retry the
queue declaration a number of times before failing.
  • Loading branch information
garyrussell committed Jan 27, 2012
commit b20994af0a2d38b38e417d59b005b9990613240a
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class BlockingQueueConsumer {

private final AtomicBoolean cancelled = new AtomicBoolean(false);

private final AtomicBoolean cancelReceived = new AtomicBoolean(false);

private final AcknowledgeMode acknowledgeMode;

private final ConnectionFactory connectionFactory;
Expand Down Expand Up @@ -166,7 +168,12 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
logger.debug("Retrieving delivery for " + this);
}
checkShutdown();
return handle(queue.poll(timeout, TimeUnit.MILLISECONDS));
Message message = handle(queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && cancelReceived.get()) {
cancelReceived.set(false);
throw new ConsumerCancelledException();
}
return message;
}

public void start() throws AmqpException {
Expand All @@ -178,20 +185,36 @@ public void start() throws AmqpException {
this.consumer = new InternalConsumer(channel);
this.deliveryTags.clear();
this.activeObjectCounter.add(this);
try {
if (!acknowledgeMode.isAutoAck()) {
// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
// will send blocks of 100 messages)
channel.basicQos(prefetchCount);
}
for (int i = 0; i < queues.length; i++) {
channel.queueDeclarePassive(queues[i]);
int passiveDeclareTries = 3; // mirrored queue might be being moved
do {
try {
if (!acknowledgeMode.isAutoAck()) {
// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
// will send blocks of 100 messages)
channel.basicQos(prefetchCount);
}
for (int i = 0; i < queues.length; i++) {
channel.queueDeclarePassive(queues[i]);
}
passiveDeclareTries = 0;
} catch (IOException e) {
if (passiveDeclareTries > 0) {
if (logger.isWarnEnabled()) {
logger.warn("Reconnect failed; retries left=" + passiveDeclareTries, e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
} else {
this.activeObjectCounter.release(this);
throw new FatalListenerStartupException("Cannot prepare queue for listener. "
+ "Either the queue doesn't exist or the broker will not allow us to use it.", e);
}
}
} catch (IOException e) {
this.activeObjectCounter.release(this);
throw new FatalListenerStartupException("Cannot prepare queue for listener. "
+ "Either the queue doesn't exist or the broker will not allow us to use it.", e);
}
} while (passiveDeclareTries-- > 0);

try {
for (int i = 0; i < queues.length; i++) {
channel.basicConsume(queues[i], acknowledgeMode.isAutoAck(), consumer);
Expand All @@ -207,12 +230,21 @@ public void start() throws AmqpException {
public void stop() {
cancelled.set(true);
if (consumer != null && consumer.getChannel() != null && consumer.getConsumerTag() != null) {
RabbitUtils.closeMessageConsumer(consumer.getChannel(), consumer.getConsumerTag(), transactional);
try {
RabbitUtils.closeMessageConsumer(consumer.getChannel(), consumer.getConsumerTag(), transactional);
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Error closing consumer", e);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Closing Rabbit Channel: " + channel);
}
logger.debug("Closing Rabbit Channel: " + channel);
// This one never throws exceptions...
RabbitUtils.closeChannel(channel);
deliveryTags.clear();
consumer = null;
}

private class InternalConsumer extends DefaultConsumer {
Expand All @@ -231,6 +263,14 @@ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig
deliveryTags.clear();
}

@Override
public void handleCancel(String consumerTag) throws IOException {
if (logger.isWarnEnabled()) {
logger.warn("Cancel received");
}
cancelReceived.set(true);
}

@Override
public void handleCancelOk(String consumerTag) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.listener;

/**
* Thrown when the broker cancels the consumer and the message
* queue is drained.
*
* @author Gary Russell
* @since 1.0.1
*
*/
public class ConsumerCancelledException extends RuntimeException {

private static final long serialVersionUID = 1L;

}