-
Notifications
You must be signed in to change notification settings - Fork 646
Description
We are getting this exception randomly at our code when send message flow is in spring transaction and message are not commit:
"com.rabbitmq.client.ChannelContinuationTimeoutException: Continuation call for method #method<tx.commit>() on channel AMQChannel(amqp://server@ip:5672/server,1) (#1) timed out": ["AMQChannel.java", 313, "com.rabbitmq.client.impl.AMQChannel.wrapTimeoutException", "AMQChannel.java", 295, "com.rabbitmq.client.impl.AMQChannel.privateRpc", "AMQChannel.java", 141, "com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc", "ChannelN.java", 1540, "com.rabbitmq.client.impl.ChannelN.txCommit", "ChannelN.java", 52, "com.rabbitmq.client.impl.ChannelN.txCommit", "", -1, "sun.reflect.GeneratedMethodAccessor452.invoke", "DelegatingMethodAccessorImpl.java", 43, "sun.reflect.DelegatingMethodAccessorImpl.invoke", "Method.java", 498, "java.lang.reflect.Method.invoke", "CachingConnectionFactory.java", 1190, "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke", "", -1, "com.sun.proxy.$Proxy941.txCommit", "RabbitResourceHolder.java", 153, "org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll"],
After this error occurred we are seeing that the spring transaction synchronizations are missing, but resource for rabbitResourceHolder is present , we are verifying this with a log we printed
Logging Code
private void logSpringTransactionDetailsForRabbitFlow(String routingKey, RabbitTemplate template) {
if (template.isChannelTransacted() && TransactionSynchronizationManager.isActualTransactionActive() && isRabbitResourceOrSyncNotPresent(template)) {
LOGGER.error("For routing key {} current spring transaction {}, have these synchronizations : {} and this resourcemap {}",
routingKey, TransactionSynchronizationManager.getCurrentTransactionName(),
TransactionSynchronizationManager.getSynchronizations(),
TransactionSynchronizationManager.getResourceMap());
}
}
private boolean isRabbitResourceOrSyncNotPresent(RabbitTemplate template) {
return !TransactionSynchronizationManager.hasResource(template.getConnectionFactory()) || !isRabbitSyncPresent();
}
private boolean isRabbitSyncPresent() {
for (TransactionSynchronization transactionSynchronization : TransactionSynchronizationManager.getSynchronizations()) {
if(transactionSynchronization.getClass().getName().contains("RabbitResourceSynchronization")) {
return true;
}
}
return false;
}
Output :
For routing key tk.news.alerts current spring transaction name_of_transaction, have these synchronizations : [] and this resourcemap {org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@616f930f=org.springframework.orm.jpa.EntityManagerHolder@61768137,CachingConnectionFactory [channelCacheSize=25, host=our_host_url, port=5672, active=true com.sample.messaging.impl.connection.CachingConnectionFactory@240953f5]=org.springframework.amqp.rabbit.connection.RabbitResourceHolder@692ba9fb}
From logs we can see that synchronization was not added but resources are already present.
So I suspect that because of ChannelContinuationTimeoutException, the first commit was not success, but resources added by this were still present in TransactionSynchronizationManager.getResource
And next time when we send new message in a new transaction, the condition TransactionSynchronizationManager.hasResource return true in below code, thus it do not add the new synchronization for the new message.
//class org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils
public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
ConnectionFactory connectionFactory, boolean synched) {
if (TransactionSynchronizationManager.hasResource(connectionFactory)
|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
}
TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
resourceHolder.setSynchronizedWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
connectionFactory));
}
return resourceHolder;
}
So can this code be enhanced so that orphan resources could be remove ? or is there any work around to handle such cases?