Um assunto recorrente quando falamos de RabbitMQ é a implementação de delay.
Embora tenhamos o delayed exchange, existem questões relacionadas a cluster que precisam ser consideradas e por isso não é algo que aborde, além de ser algo instalado à parte.
Existem muitos motivos para evitarmos esse tipo de exchange e na apresentação do Clayton Cavaleiro sobre “Servindo 1MM de mensagens simultâneas com RabbitMQ” vimos alguns desses problemas.
Assim vamos abordar uma solução alternativa, usando uma fila auxiliar.
Entendendo Oragon.RabbitMQ
O projeto consiste em implementar Minimal API’s para consumir filas do RabbitMQ, e aqui temos a diferença entre um consumidor normal e a implementação com Oragon.RabbitMQ.

A criação de AMQPResults que dão suporte à essa implementação sustentou a possibilidade de um AMQPResult capaz de enviar uma mensagem para uma fila de delay.
É um modelo similar ao encontrado nas Mininal API’s, e copiei para entregar a mesma experiência de uso.
Dessa forma, o MapQueue permite que você lide na Minimal API com os tratamentos necessários, sem proliferar, propagar e assim ferir as responsabilidades da sua camada de serviço/negócio com conteúdo relacionado ao RabbitMQ.

A implementação do DelayedResult é uma implementação que não faz parte da library, faz parte do ACADEMIA PAY e está licenciado sob MIT (licença permissiva) e é uma das possibilidades de uso.
Abaixo segue o código de exemplo.
// Licensed to LuizCarlosFaria, gaGO.io, Mensageria .NET, Cloud Native .NET and ACADEMIA.DEV under one or more agreements. // The ACADEMIA.DEV licenses this file to you under the MIT license. using global::Oragon.RabbitMQ.Consumer.Actions; using global::Oragon.RabbitMQ.Consumer; using RabbitMQ.Client; using System.Text; using System.Threading.Tasks; namespace AcademiaPay.Messaging; public class DelayedResult : IAMQPResult { private readonly TimeSpan ttl; /// <summary> /// Initializes a new instance of the <see cref="DelayedResult"/> class with the specified message and TTL. /// </summary> /// <param name="ttl">The TTL (Time-To-Live) as a TimeSpan.</param> public DelayedResult(TimeSpan ttl) { this.ttl = ttl; } public async Task ExecuteAsync(IAmqpContext context) { ArgumentNullException.ThrowIfNull(context, nameof(context)); var delayedQueueName = $"{context.QueueName}.delayed"; var properties = new BasicProperties { // Copy all properties from old BasicProperties to new BasicProperties AppId = context.Request.BasicProperties.AppId, ClusterId = context.Request.BasicProperties.ClusterId, ContentEncoding = context.Request.BasicProperties.ContentEncoding, ContentType = context.Request.BasicProperties.ContentType, CorrelationId = context.Request.BasicProperties.CorrelationId, DeliveryMode = context.Request.BasicProperties.DeliveryMode, Expiration = context.Request.BasicProperties.Expiration, Headers = context.Request.BasicProperties.Headers, MessageId = context.Request.BasicProperties.MessageId, Persistent = context.Request.BasicProperties.Persistent, Priority = context.Request.BasicProperties.Priority, ReplyTo = context.Request.BasicProperties.ReplyTo, Timestamp = context.Request.BasicProperties.Timestamp, Type = context.Request.BasicProperties.Type, UserId = context.Request.BasicProperties.UserId, ReplyToAddress = context.Request.BasicProperties.ReplyToAddress }; // Only TTL will be changed properties.Expiration = ((int)this.ttl.TotalMilliseconds).ToString(); await context.Channel.BasicPublishAsync( exchange: string.Empty, routingKey: delayedQueueName, mandatory: true, basicProperties: properties, body: context.Request.Body).AsTask().ConfigureAwait(false); await context.Channel.BasicAckAsync(context.Request.DeliveryTag, false).ConfigureAwait(false); } }
Para o mecanismo funcionar, a fila de delay precisa ter como deadletter uma rota válida para a fila de processamento.
Para esse mecanismo funcionar é necessário criar uma fila auxiliar para cada fila que possua delay.
Aproveite…
0 comentários