Nota:
Nesse momento, entraremos um pouco mais em detalhes sobre como o protocolo AMQP funciona e sobre a biblioteca de AMQP do spring, escrevi um “guia” bem básico sobre propriedades do protocolo, se quiser conferir, pode ver aqui 🙂
Ou nesse link
Contexto
O Spring AMQP
consiste em dois módulos principais: spring-amqp
e spring-rabbit
. O ‘spring-amqp’ contém o pacote org.springframework.amqp.core
, que trata das principais abstrações definidas no protocolo AMQP (RabbitMQ é um dealer, que implementa esse protocolo), esse pacote não se baseia em nenhuma biblioteca de clientes nem implementação de dealer.
Essas abstrações então são implementadas pelos módulos específicos dos brokers (spring-rabbit
). Teoricamente, como o AMQP é opera em nível de protocolo, você poderia utilizar o cliente do rabbit com outro broker, mas isso não é oficialmente suportado.
A mensagem
A mensagem ****definida no protocolo amqp é um conjunto de bytes e propriedades, passados separadamente. Para tornar o uso mais fácil, dentro do java juntamos isso em uma abstração chamada Message
public class Message {
personal closing MessageProperties messageProperties;
personal closing byte[] physique;
public Message(byte[] physique, MessageProperties messageProperties) {
this.physique = physique;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.physique;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
A alternate
A alternate é uma outra abstração simples, é basicamente o centro de distribuição de mensagens, que envia as mensagens de acordo com suas diretrizes:
public interface Change {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
Os tipos básicos de alternate são: direct
, matter
, fanout
e headers
. Você pode encontrar implementações para cada um dos tipos no pacote core.
A
Matter
alternate helps bindings with routing patterns which will embrace the ‘*’ and ‘#’ wildcards for ‘exactly-one’ and ‘zero-or-more’, respectively. TheFanout
alternate publishes to all queues which might be certain to it with out taking any routing key into consideration.
💡 A especificação AMQP outline uma alternate padrão não nomeada, todas as queues sem alternate vinculadas são automaticamente vinculadas à ela, com seus nomes como routing keys
Queues
A classe Queue
também representa uma abstração desse tipo no protocolo:
public class Queue {
personal closing String identify;
personal risky boolean sturdy;
personal risky boolean unique;
personal risky boolean autoDelete;
personal risky Map<String, Object> arguments;
/**
* The queue is sturdy, non-exclusive and non auto-delete.
*
* @param identify the identify of the queue.
*/
public Queue(String identify) {
this(identify, true, false, false);
}
// Getters e Setters omitidos
}
Bindings
Bindings são a relação entre filas e exchanges!
new Binding(someQueue, someDirectExchange, "foo.bar"); // direct alternate, routing keys fixas
new Binding(someQueue, someTopicExchange, "foo.*"); // matter alternate, usando wildcard
new Binding(someQueue, someFanoutExchange); // fanout
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
// BindingBuilder é a maneira bonitinha, eu gosto, mas importa estático!
💡 Uma instância de uma Binding
não trará alterações reais por ela mesma, para isso, deveremos usar a classe AmqpAdmin
ou definir as bindings usando a anotação @Bean
, é o que veremos a seguir
Já vimos nos exemplos anteriores como as exchanges, bindings e queues são criadas, a partir de agora, só criar!
Nossas configurações, ao invés de só possuir bean de Queue
, agora incluirão Exchanges
e Bindings
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RabbitMqConfiguration {
personal closing TicketQueueProperties ticketQueueProperties;
@Bean
public Queue queue(){
log.data("Searching for queue: {}", ticketQueueProperties.getName());
return new Queue(ticketQueueProperties.getName(), true);
}
@Bean Change ticketDirectExchange(){
closing String EXCHANGE_NAME = "ticket";
log.data("Creating alternate: ticket-exchange");
return new DirectExchange(EXCHANGE_NAME);
}
@Bean Binding ticketBinding(){
log.data("Create ticket binding");
return BindingBuilder.bind(queue()).to(ticketDirectExchange()).with(ticketQueueProperties.getName()).noargs();
}
}
💡 Para fazer direito, provavelmente também faríamos o refactor da nossa TicketQueueProperties
, provavelmente teríamos um RabbitMqProperties
, onde deixaríamos configurações de filas, exchanges e bindings de maneira mais organizada!
Perfeito, já vimos nossa alternate funcionando bonito! Tudo pronto!
Só que não! Lembre-se que o nosso writer está enviando mensagens com a routing key correta, mas para a alternate errada, vamos mudar o código para o seguinte:
rabbitTemplate.convertAndSend("direct.ticket",ticketQueueProperties.getName(),occasion.identify());
- Aqui deveríamos puxar esse nome da nossa ultimate
TicketQueueProperties
🙂 para evitar esse spaghetti
O AmqpTemplate
também outline vários métodos para enviar e receber mensagens que, no fim das contas, delegam tarefas para um MessageConverter
. O MessageConverter
fornece um método único para cada direção: um para converter para um Message
e outro para converter a partir de um Message
.
Definição da interface MessageConverter
:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
SimpleMessageConverter
A implementação padrão do technique MessageConverter
é chamada de SimpleMessageConverter
. Este é o conversor usado por uma instância de RabbitTemplate
se você não configurar explicitamente uma alternativa.
Converts a String to a
[TextMessage](https://jakarta.ee/specs/platform/9/apidocs/jakarta/jms/TextMessage.html)
, a byte array to a[BytesMessage](https://jakarta.ee/specs/platform/9/apidocs/jakarta/jms/BytesMessage.html)
, a Map to a[MapMessage](https://jakarta.ee/specs/platform/9/apidocs/jakarta/jms/MapMessage.html)
, and a Serializable object to a[ObjectMessage](https://jakarta.ee/specs/platform/9/apidocs/jakarta/jms/ObjectMessage.html)
(or vice versa).
Trocando o Conversor
Para trabalhar com objetos serializados e desserializados para JSON, vamos usar o Jackson2JsonMessageConverter
.
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
Colocaremos isso tanto no shopper quanto no producer 🙂
Falamos anteriormente do nossoTicketQueueProperties
, que poderíamos melhorá-lo, é o que vamos fazer, na realidade, vamos substituí-lo.
Primeiro de tudo, vamos definir um formato declarativo para filas, exchanges e bindings que nos agrade, para mim:
dealer:
queues:
ticket:
identify: default.ticket
exchanges:
ticket:
identify: direct.ticket
sort: direct
bindings:
ticket:
alternate: direct.ticket
queue: default.ticket
routingKey: default.ticket
Criando um ConfigurationProperties adequado
A partir disso, vamos mapear essas propriedades em courses de uma maneira adequada. Chamarei a classe de BrokerConfigurationProperties
:
bundle com.kaue.ticketservice.infrastructure.properties;
import jakarta.validation.constraints.NotEmpty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import lombok.Knowledge;
@Configuration
@ConfigurationProperties(prefix = "dealer")
@Knowledge
public class BrokerConfigurationProperties {
personal Map<String, QueueProperties> queues;
personal Map<String, ExchangeProperties> exchanges;
personal Map<String, BindingProperties> bindings;
@Knowledge
public static class QueueProperties {
@NotEmpty
personal String identify;
}
@Knowledge
public static class ExchangeProperties {
@NotEmpty
personal String identify;
personal String sort;
}
@Knowledge
public static class BindingProperties {
@NotEmpty
personal String alternate;
@NotEmpty
personal String queue;
@NotEmpty
personal String routingKey;
}
}
- Possuímos 3 maps, estruturas que linkam uma chave à sua correspondente configuração,
Queue
,Change
ouBinding
Properties. - Fazemos o mapeamento padrão, usando :
@ConfigurationProperties(prefix = "dealer")
, até aqui, sem segredos 🙂
Transformando as propriedades em objetos!
A partir de agora, o terceiro passo pode parecer simples, devemos criar beans a partir das propriedades, isso não é um problema, pelo menos não se quisermos definir os Beans da maneira que fizemos antes, apesar disso, se quisermos definir uma lista de Queues, Exchanges e Bindings, devemos usar a classe Declarables
, e prover um bean para ela.
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
O exemplo acima, da documentação de referência do spring, é uma boa forma de exemplificar o uso mais simples de Declarables, vamos ver minha implementação em specific, que adiciona declarables de acordo com a BrokerConfigurationProperties
bundle com.kaue.ticketservice.infrastructure.configuration;
// ... ommitted
/**
* This courses creates all queues, exchanges and bindings based mostly on utility.yaml once they're wanted (referred to as by a shopper or posted a message into).
*/
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RabbitMqConfiguration {
personal closing BrokerConfigurationProperties brokerConfig;
personal closing Record<Queue> definedQueues = new ArrayList<>();
personal closing Record<Change> definedExchanges = new ArrayList<>();
@Bean
public Declarables queues() {
if (brokerConfig == null || brokerConfig.getQueues() == null) {
return new Declarables(); // Return an empty checklist if no queues are configured
}
var queueList = brokerConfig.getQueues().values().stream()
.filter(Objects::nonNull)
.map(queueProperties -> new Queue(queueProperties.getName(), true))
.toList();
definedQueues.addAll(queueList);
log.data("Declared queues");
return new Declarables(queueList);
}
@Bean
public Declarables exchanges() {
if (brokerConfig == null || brokerConfig.getExchanges() == null) {
return new Declarables(); // Return an empty checklist if no exchanges are configured
}
var exchangesList = brokerConfig.getExchanges().values().stream()
.filter(Objects::nonNull)
.map(exchangeProperties -> new DirectExchange(exchangeProperties.getName())) // todo use right alternate sort
.toList();
definedExchanges.addAll(exchangesList);
log.data("Declared exchanges");
return new Declarables(exchangesList);
}
@Bean
public Declarables bindings() {
if (brokerConfig == null || brokerConfig.getBindings() == null) {
return new Declarables();
}
var bindingsList = brokerConfig.getBindings().values().stream()
.map(bindingProperties -> {
log.data("Creating binding between alternate {} and queue {} with routing key {}",
bindingProperties.getExchange(), bindingProperties.getQueue(), bindingProperties.getRoutingKey());
Queue queue = findQueueByName(bindingProperties.getQueue());
Change alternate = findExchangeByName(bindingProperties.getExchange());
return BindingBuilder.bind(queue)
.to(alternate)
.with(bindingProperties.getRoutingKey())
.noargs();
})
.toList();
return new Declarables(bindingsList);
}
personal Queue findQueueByName (String queueName){
return definedQueues.stream()
.filter(queue -> queueName.equals(queue.getName()))
.findFirst()
.orElse(null);
}
personal Change findExchangeByName (String exchangeName){
return definedExchanges.stream()
.filter(alternate -> exchangeName.equals(alternate.getName()))
.findFirst()
.orElse(null);
}
}
Embora grande, a implementação é relativamente simples, usamos streams para transformar as propriedades em courses reais e retornamos o Declarable como um Bean
, um objeto gerenciado pelo spring.
No Spring, quando um método anotado como listener joga uma exception, as mensagens podem ser inseridas novamente na fila e reprocessadas, descartadas ou colocadas em uma Lifeless Letter Queue. Nada é devolvido ao emissor da mensagem.
Error Dealing with
Na versão 2.0 do Spring AMQP em diante, @RabbitLisetener tem 2 atributos: errorHandler
ereturnExceptions
, mas eles não são configurados por padrão.
Você pode usar o errorHandler
para prover um Bean de RabbitListenerErrorHandler
. Essa interface funcional tem um método:
@FunctionalInterface
public interface RabbitListenerErrorHandler {
Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception;
}
Aqui, por exemplo, poderíamos dizer que exceções de serviço ou fatais jogam exceções AmqpRejectAndDontRequeueException
, para evitar requeue.
As you may see, you may have entry to the uncooked message acquired from the container, the spring-messaging
Message<?>
object produced by the message converter, and the exception that was thrown by the listener (wrapped in aListenerExecutionFailedException
). The error handler can both return some end result (which is shipped because the reply) or throw the unique or a brand new exception (which is thrown to the container or returned to the sender, relying on thereturnExceptions
setting).
A citação acima comenta uma maneira de enviar exceptions de volta ao sender, se te interessar, pode ver aqui
Retries!
Podemos customizar e modificar configurações de retry indicadas dentro do nosso projeto, para isso usaremos o projeto spring-retry
, vamos ver uma configuração simples no Bean
do RabbitTemplate
:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = RetryTemplate.builder()
.maxAttempts(3)
.fixedBackoff(1000)
.retryOn(RemoteAccessException.class)
.construct();
retryTemplate.execute(ctx -> {
// ... do one thing
});
template.setRetryTemplate(retryTemplate);
return template;
}
Para mais informações, veja o [spring-retry](https://github.com/spring-projects/spring-retry#using-retrytemplate)
When a listener throws an exception, it’s wrapped in a
ListenerExecutionFailedException
. Usually the message is rejected and requeued by the dealer. SettingdefaultRequeueRejected
tofalse
causes messages to be discarded (or routed to a useless letter alternate).
Vamos tentar seguir o que o comentário acima da documentação do spring diz:
spring:
rabbitmq :
... adresses e outras configs
listener:
easy:
default-requeue-rejected: false
Depois dessa configuração, as mensagens quando possuem um erro são DELETADAS, desabilitando os retries. Isso provavelmente não é o que queremos, por isso, vamos estudar as DLQ’s.
Lifeless Letter Queues (DLQ) são filas que possuem mensagens que tiveram sua execução falhada em algum momento, o comportamento das DLQ’s pode ser configurado no próprio dealer.
Lifeless Letter Queues são úteis em sistemas mais críticos, onde necessitamos que um job rode de qualquer forma, onde podemos jogar mensagens de DLQ’s na alternate padrão novamente, ou pelo menos entendermos o porquê daquilo não ter sido executado, essas filas possuem diversas funções.
A maneira de definir useless letters é algo explicado dentro do protocolo AMQP, podemos apenas seguir essa configuração:
@Bean
Queue messagesQueue() {
return QueueBuilder.sturdy("queue-name")
.withArgument("x-dead-letter-exchange", "nome-exchange.dlx")
.withArgument("x-dead-letter-routing-key", "queue-name.dlq") // nao precisa ser o nome da queue, mas é comum para direct
.construct();
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.sturdy("queue-name.dlq").construct();
}
No fim das contas, uma useless letter queue é uma queue regular, e uma useless letter alternate também, portanto, se uma mensagem chegar na DLX (Lifeless Letter Change) e não tiver uma routing key correta, ela não chegará na fila, tudo regular por aqui.
💡 Se tivermos uma alternate como string vazia, ela usará a alternate padrão!
Existem diversas maneiras de trabalhar com rabbitMQ, e uma infinidade de propriedades e configurações não mostradas aqui, como por exemplo: Suggestions síncrono de exchanges e filas, Shoppers Assíncronos, Containers Diferentes, propriedades de requeue, monitoramento de shoppers, and so forth. Se algo fizer sentido para seu contexto, pode buscar no materials de referência do Spring 🙂
Mais sobre DLQ: https://www.youtube.com/watch?v=GgIJWxk_-jM
Mais sobre exception dealing with: https://www.baeldung.com/spring-amqp-error-handling
https://docs.spring.io/spring-amqp/reference/html/#template-retry