Skip to content
Prev Previous commit
Next Next commit
AMQP-101: Added tests to check if ErrorHandler is invoked with multip…
…le messages for Pojo-listener, listener and channel-aware-listener.
  • Loading branch information
Tomas Lukosius committed Feb 27, 2011
commit 83ba72d98206f9b517d663a286b8284e8b4f692e
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
Expand All @@ -30,6 +31,8 @@
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
import org.springframework.util.ErrorHandler;

import com.rabbitmq.client.Channel;

public class MessageListenerContainerErrorHandlerIntegrationTests {

private static Log logger = LogFactory.getLog(MessageListenerContainerErrorHandlerIntegrationTests.class);
Expand All @@ -55,8 +58,9 @@ public void setUp() {
@Test
public void testErrorHandlerInvokeExceptionFromPojo() throws Exception {
int messageCount = 3;

doTest(messageCount, errorHandler, true, new Exception("Pojo exception"));
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new MessageListenerAdapter(new PojoThrowingExceptionListener(latch,
new Exception("Pojo exception"))));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
Expand All @@ -65,8 +69,9 @@ public void testErrorHandlerInvokeExceptionFromPojo() throws Exception {
@Test
public void testErrorHandlerInvokeRuntimeExceptionFromPojo() throws Exception {
int messageCount = 3;

doTest(messageCount, errorHandler, true, new RuntimeException("Pojo runtime exception"));
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new MessageListenerAdapter(new PojoThrowingExceptionListener(latch,
new RuntimeException("Pojo runtime exception"))));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
Expand All @@ -75,9 +80,9 @@ public void testErrorHandlerInvokeRuntimeExceptionFromPojo() throws Exception {
@Test
public void testErrorHandlerInvokeSpecRuntimeExceptionFromListener() throws Exception {
int messageCount = 3;

doTest(messageCount, errorHandler, false, new ListenerExecutionFailedException(
"Listener throws specific runtime exception", null));
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new ThrowingExceptionListener(latch,
new ListenerExecutionFailedException("Listener throws specific runtime exception", null)));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
Expand All @@ -98,15 +103,38 @@ public void testErrorHandlerInvokeRuntimeExceptionFromListener() throws Exceptio
// But if listener throws ListenerExecutionFailedException test will pass (see
// testErrorHandlerInvokeSpecRuntimeExceptionFromListener())
// even with multiple messages. Investigation required.
int messageCount = 1;
int messageCount = 3;
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new ThrowingExceptionListener(latch, new RuntimeException(
"Listener runtime exception")));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
}

doTest(messageCount, errorHandler, false, new RuntimeException("Listener runtime exception"));
@Test
public void testErrorHandlerInvokeExceptionFromChannelAwareListener() throws Exception {
int messageCount = 3;
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new ThrowingExceptionChannelAwareListener(latch, new Exception(
"Channel aware listener exception")));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
}

public void doTest(int messageCount, ErrorHandler errorHandler, boolean isPojo, Throwable exceptionToThrow)
@Test
public void testErrorHandlerInvokeRuntimeExceptionFromChannelAwareListener() throws Exception {
int messageCount = 3;
CountDownLatch latch = new CountDownLatch(messageCount);
doTest(messageCount, errorHandler, latch, new ThrowingExceptionChannelAwareListener(latch,
new RuntimeException("Channel aware listener runtime exception")));

// Verify that error handler was invoked
verify(errorHandler, times(messageCount)).handleError(any(Throwable.class));
}

public void doTest(int messageCount, ErrorHandler errorHandler, CountDownLatch latch, Object listener)
throws Exception {
int concurrentConsumers = 1;
RabbitTemplate template = createTemplate(concurrentConsumers);
Expand All @@ -116,24 +144,14 @@ public void doTest(int messageCount, ErrorHandler errorHandler, boolean isPojo,
template.convertAndSend(queue.getName(), i + "foo");
}

CountDownLatch latch = new CountDownLatch(messageCount);

// Create listener from pojo or from MessageListener implementation
MessageListener listener;
if (isPojo) {
listener = new MessageListenerAdapter(new PojoThrowingExceptionListener(latch, exceptionToThrow));
} else {
listener = new ThrowingExceptionListener(latch, (RuntimeException) exceptionToThrow);
}

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(listener);
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setChannelTransacted(false);
container.setConcurrentConsumers(concurrentConsumers);

container.setPrefetchCount(1);
container.setTxSize(1);
container.setPrefetchCount(messageCount);
container.setTxSize(messageCount);
container.setQueueName(queue.getName());
container.setErrorHandler(errorHandler);
container.afterPropertiesSet();
Expand Down Expand Up @@ -161,6 +179,9 @@ private RabbitTemplate createTemplate(int concurrentConsumers) {
return template;
}

// ///////////////
// Helper classes
// ///////////////
public static class PojoThrowingExceptionListener {
private CountDownLatch latch;
private Throwable exception;
Expand All @@ -170,10 +191,6 @@ public PojoThrowingExceptionListener(CountDownLatch latch, Throwable exception)
this.exception = exception;
}

public void reset(CountDownLatch latch) {
this.latch = latch;
}

public void handleMessage(String value) throws Throwable {
try {
logger.debug("Message in pojo: " + value);
Expand All @@ -194,14 +211,35 @@ public ThrowingExceptionListener(CountDownLatch latch, RuntimeException exceptio
this.exception = exception;
}

public void reset(CountDownLatch latch) {
public void onMessage(Message message) {
try {
String value = new String(message.getBody());
logger.debug("Message in listener: " + value);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
// Ignore this exception
}
throw exception;
} finally {
latch.countDown();
}
}
}

public static class ThrowingExceptionChannelAwareListener implements ChannelAwareMessageListener {
private CountDownLatch latch;
private Exception exception;

public ThrowingExceptionChannelAwareListener(CountDownLatch latch, Exception exception) {
this.latch = latch;
this.exception = exception;
}

public void onMessage(Message message) {
public void onMessage(Message message, Channel channel) throws Exception {
try {
String value = new String(message.getBody());
logger.debug("Message in listener: " + value);
logger.debug("Message in channel aware listener: " + value);
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
Expand Down