Skip to content
Prev Previous commit
Next Next commit
AMQP-101 Transactional channel tests using listener and channelAwareL…
…istener.
  • Loading branch information
Tomas Lukosius committed Feb 28, 2011
commit 33514d45ae6238e656635e098bafae327e5f7208
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.test.BrokerRunning;
Expand All @@ -33,6 +37,8 @@
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;

import com.rabbitmq.client.Channel;

@RunWith(Parameterized.class)
public class SimpleMessageListenerContainerIntegrationTests {

Expand Down Expand Up @@ -131,22 +137,56 @@ public void clear() throws Exception {
}
}

@Test
public void testPojoListenerSunnyDay() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
doSunnyDayTest(latch, new MessageListenerAdapter(new PojoListener(latch)));
}

@Test
public void testListenerSunnyDay() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
container = createContainer(new PojoListener(latch));
doSunnyDayTest(latch, new Listener(latch));
}

@Test
@Ignore ("RabbitMQ connection can not be obtained if running in Parameterized mode")
public void testChannelAwareListenerSunnyDay() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
doSunnyDayTest(latch, new ChannelAwareListener(latch));
}

@Test
public void testPojoListenerWithException() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
doListenerWithExceptionTest(latch, new MessageListenerAdapter(new PojoListener(latch, true)));
}

@Test
public void testListenerWithException() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
doListenerWithExceptionTest(latch, new Listener(latch, true));
}

@Test
@Ignore ("RabbitMQ connection can not be obtained if running in Parameterized mode")
public void testChannelAwareListenerWithException() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
doListenerWithExceptionTest(latch, new ChannelAwareListener(latch, true));
}

private void doSunnyDayTest(CountDownLatch latch, Object listener) throws Exception {
container = createContainer(listener);
for (int i = 0; i < messageCount; i++) {
template.convertAndSend(queue.getName(), i + "foo");
}
boolean waited = latch.await(Math.max(2, messageCount / 50), TimeUnit.SECONDS);
assertTrue("Timed out waiting for message", waited);
assertNull(template.receiveAndConvert(queue.getName()));
}

@Test
public void testListenerWithException() throws Exception {
CountDownLatch latch = new CountDownLatch(messageCount);
container = createContainer(new PojoListener(latch, true));

private void doListenerWithExceptionTest(CountDownLatch latch, Object listener) throws Exception {
container = createContainer(listener);
if (acknowledgeMode.isTransactionAllowed()) {
// Should only need one message if it is going to fail
for (int i = 0; i < concurrentConsumers; i++) {
Expand Down Expand Up @@ -176,7 +216,7 @@ public void testListenerWithException() throws Exception {

private SimpleMessageListenerContainer createContainer(Object listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(template.getConnectionFactory());
container.setMessageListener(new MessageListenerAdapter(listener));
container.setMessageListener(listener);
container.setQueueName(queue.getName());
container.setTxSize(txSize);
container.setPrefetchCount(txSize);
Expand Down Expand Up @@ -221,6 +261,71 @@ public void handleMessage(String value) {
}
}
}

public static class Listener implements MessageListener {
private AtomicInteger count = new AtomicInteger();

private final CountDownLatch latch;

private final boolean fail;

public Listener(CountDownLatch latch) {
this(latch, false);
}

public Listener(CountDownLatch latch, boolean fail) {
this.latch = latch;
this.fail = fail;
}

public void onMessage(Message message) {
String value = new String(message.getBody());
try {
int counter = count.getAndIncrement();
if (logger.isDebugEnabled() && counter % 500 == 0) {
logger.debug(value + counter);
}
if (fail) {
throw new RuntimeException("Planned failure");
}
} finally {
latch.countDown();
}
}
}

public static class ChannelAwareListener implements ChannelAwareMessageListener {
private AtomicInteger count = new AtomicInteger();

private final CountDownLatch latch;

private final boolean fail;

public ChannelAwareListener(CountDownLatch latch) {
this(latch, false);
}

public ChannelAwareListener(CountDownLatch latch, boolean fail) {
this.latch = latch;
this.fail = fail;
}

public void onMessage(Message message, Channel channel) throws Exception {
String value = new String(message.getBody());
try {
int counter = count.getAndIncrement();
if (logger.isDebugEnabled() && counter % 500 == 0) {
logger.debug(value + counter);
}
if (fail) {
throw new RuntimeException("Planned failure");
}
} finally {
latch.countDown();
}
}

}

@SuppressWarnings("serial")
private class TestTransactionManager extends AbstractPlatformTransactionManager {
Expand All @@ -243,5 +348,4 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc
}

}

}