Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
migrate web rabbitmq to azure service bus
  • Loading branch information
yiliuTo committed Mar 19, 2025
commit d154315a57fc36225a31e5c2c14dea22839642cc
16 changes: 14 additions & 2 deletions asset-manager/web/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<properties>
<aws-sdk.version>2.25.13</aws-sdk.version>
<version.spring.cloud.azure>5.19.0</version.spring.cloud.azure>
</properties>

<artifactId>assets-manager-web</artifactId>
Expand All @@ -27,8 +28,12 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter</artifactId>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-messaging-azure-servicebus</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down Expand Up @@ -79,6 +84,13 @@
<artifactId>azure-storage-blob-batch</artifactId>
<version>12.25.0</version>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-dependencies</artifactId>
<version>${version.spring.cloud.azure}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.microsoft.migration.assets;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.ApplicationPidFileWriter;

@SpringBootApplication
@EnableRabbit
@EnableAzureMessaging
public class AssetsManagerApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(AssetsManagerApplication.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,75 +1,84 @@
package com.microsoft.migration.assets.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.ResourceExistsException;
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions;
import com.azure.messaging.servicebus.administration.models.QueueProperties;
import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties;
import com.azure.spring.messaging.ConsumerIdentifier;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
public class RabbitConfig {
public static final String IMAGE_PROCESSING_QUEUE = "image-processing";

// Dead letter queue configuration for the retry mechanism
public static final String RETRY_EXCHANGE = "image-processing.retry";
public static final String RETRY_QUEUE = "image-processing.retry";
public static final String RETRY_ROUTING_KEY = "retry";
public static final int RETRY_DELAY_MS = 60000; // 1 minute delay

@Bean
public Queue imageProcessingQueue() {
return QueueBuilder.durable(IMAGE_PROCESSING_QUEUE
)
.withArgument("x-dead-letter-exchange", RETRY_EXCHANGE)
.withArgument("x-dead-letter-routing-key", RETRY_ROUTING_KEY)
.build();
}

@Bean
public Queue retryQueue() {
return QueueBuilder.durable(RETRY_QUEUE)
.withArgument("x-dead-letter-exchange", "")
.withArgument("x-dead-letter-routing-key", IMAGE_PROCESSING_QUEUE
)
.withArgument("x-message-ttl", RETRY_DELAY_MS)
.build();
}
public static final String RETRY_QUEUE = "retry-queue";
public static final Duration RETRY_QUEUE_TTL = Duration.ofMinutes(1);

@Bean
public DirectExchange retryExchange() {
return new DirectExchange(RETRY_EXCHANGE);
public ServiceBusAdministrationClient adminClient(AzureServiceBusProperties properties, TokenCredential credential) {
return new ServiceBusAdministrationClientBuilder()
.credential(properties.getFullyQualifiedNamespace(), credential)
.buildClient();
}

@Bean
public Binding retryBinding() {
return BindingBuilder
.bind(retryQueue())
.to(retryExchange())
.with(RETRY_ROUTING_KEY);
public QueueProperties retryQueue(ServiceBusAdministrationClient adminClient) {
try {
return adminClient.getQueue(RETRY_QUEUE);
} catch (ResourceNotFoundException e) {
try {
CreateQueueOptions options = new CreateQueueOptions()
.setDefaultMessageTimeToLive(RETRY_QUEUE_TTL)
.setDeadLetteringOnMessageExpiration(true);
return adminClient.createQueue(RETRY_QUEUE, options);
} catch (ResourceExistsException ex) {
// Queue was created by another instance in the meantime
return adminClient.getQueue(RETRY_QUEUE);
}
}
}

@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
public QueueProperties imageProcessingQueue(ServiceBusAdministrationClient adminClient, QueueProperties retryQueue) {
QueueProperties queue;
try {
queue = adminClient.getQueue(IMAGE_PROCESSING_QUEUE);
} catch (ResourceNotFoundException e) {
try {
CreateQueueOptions options = new CreateQueueOptions()
.setForwardDeadLetteredMessagesTo(RETRY_QUEUE);
queue = adminClient.createQueue(IMAGE_PROCESSING_QUEUE, options);
} catch (ResourceExistsException ex) {
// Queue was created by another instance in the meantime
queue = adminClient.getQueue(IMAGE_PROCESSING_QUEUE);
}
}

// Configure retry queue's DLQ forwarding now that image processing queue exists
try {
retryQueue.setForwardDeadLetteredMessagesTo(IMAGE_PROCESSING_QUEUE);
adminClient.updateQueue(retryQueue);
} catch (Exception ex) {
// Ignore update errors since basic functionality will still work
}

return queue;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setDefaultRequeueRejected(false);
return factory;
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
return identifier -> {
ProcessorProperties processorProperties = new ProcessorProperties();
processorProperties.setAutoComplete(false);
return processorProperties;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import com.microsoft.migration.assets.model.S3StorageItem;
import com.microsoft.migration.assets.repository.ImageMetadataRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
Expand All @@ -31,7 +32,7 @@
public class AwsS3Service implements StorageService {

private final BlobServiceClient blobServiceClient;
private final RabbitTemplate rabbitTemplate;
private final ServiceBusTemplate serviceBusTemplate;
private final ImageMetadataRepository imageMetadataRepository;

@Value("${{azure.storage.blob.container-name}")
Expand Down Expand Up @@ -75,7 +76,7 @@ public void uploadObject(MultipartFile file) throws IOException {
getStorageType(),
file.getSize()
);
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
serviceBusTemplate.send(IMAGE_PROCESSING_QUEUE, MessageBuilder.withPayload(message).build());

// Create and save metadata to database
ImageMetadata metadata = new ImageMetadata();
Expand All @@ -85,7 +86,7 @@ public void uploadObject(MultipartFile file) throws IOException {
metadata.setSize(file.getSize());
metadata.setS3Key(key);
metadata.setS3Url(generateUrl(key));

imageMetadataRepository.save(metadata);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.microsoft.migration.assets.service;

import com.microsoft.migration.assets.model.ImageProcessingMessage;
import com.rabbitmq.client.Channel;
import com.azure.spring.messaging.servicebus.implementation.core.annotation.ServiceBusListener;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

import static com.microsoft.migration.assets.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;

import java.io.IOException;

/**
* A backup message processor that serves as a monitoring and logging service.
Expand All @@ -25,28 +24,28 @@ public class BackupMessageProcessor {

/**
* Processes image messages from a backup queue for monitoring and resilience purposes.
* Uses the same RabbitMQ API pattern as the worker module.
* Uses the same Azure Service Bus API pattern as the worker module.
*/
@RabbitListener(queues = IMAGE_PROCESSING_QUEUE)
@ServiceBusListener(destination = IMAGE_PROCESSING_QUEUE)
public void processBackupMessage(final ImageProcessingMessage message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
@Header(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT) ServiceBusReceivedMessageContext context) {

try {
log.info("[BACKUP] Monitoring message: {}", message.getKey());
log.info("[BACKUP] Content type: {}, Storage: {}, Size: {}",
message.getContentType(), message.getStorageType(), message.getSize());

// Acknowledge the message
channel.basicAck(deliveryTag, false);
context.complete();
log.info("[BACKUP] Successfully processed message: {}", message.getKey());
} catch (Exception e) {
log.error("[BACKUP] Failed to process message: " + message.getKey(), e);

try {
// Reject the message and requeue it
channel.basicNack(deliveryTag, false, true);
// Reject the message and dead letter it
context.deadLetter();
log.warn("[BACKUP] Message requeued: {}", message.getKey());
} catch (IOException ackEx) {
} catch (Exception ackEx) {
log.error("[BACKUP] Error handling RabbitMQ acknowledgment: {}", message.getKey(), ackEx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import com.microsoft.migration.assets.model.S3StorageItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
Expand All @@ -27,15 +28,15 @@ public class LocalFileStorageService implements StorageService {

private static final Logger logger = LoggerFactory.getLogger(LocalFileStorageService.class);

private final RabbitTemplate rabbitTemplate;
private final ServiceBusTemplate serviceBusTemplate;

@Value("${local.storage.directory:../storage}")
private String storageDirectory;

private Path rootLocation;

public LocalFileStorageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
public LocalFileStorageService(ServiceBusTemplate serviceBusTemplate) {
this.serviceBusTemplate = serviceBusTemplate;
}

@PostConstruct
Expand Down Expand Up @@ -102,7 +103,7 @@ public void uploadObject(MultipartFile file) throws IOException {
getStorageType(),
file.getSize()
);
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
serviceBusTemplate.send(IMAGE_PROCESSING_QUEUE, MessageBuilder.withPayload(message).build());
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions asset-manager/web/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ azure.storage.blob.container-name=${AZURE_STORAGE_BLOB_CONTAINER_NAME}
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB

# RabbitMQ Configuration
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#Servicebus
spring.cloud.azure.credential.managed-identity-enabled=true
spring.cloud.azure.credential.client-id=${AZURE_CLIENT_ID}
spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
spring.cloud.azure.servicebus.entity-type=queue

# Database Configuration
spring.datasource.url=jdbc:postgresql://localhost:5432/assets_manager
Expand Down